Skip to content

Commit 137896f

Browse files
authored
[AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Op (#7685)
* [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator * fixup! [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator * fixup! fixup! [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator * fixup! fixup! fixup! [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator
1 parent 6140356 commit 137896f

File tree

5 files changed

+36
-76
lines changed

5 files changed

+36
-76
lines changed

UPDATING.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,24 @@ https://developers.google.com/style/inclusive-documentation
6161
6262
-->
6363

64+
### Assigning task to a DAG using bitwise shift (bit-shift) operators are no longer supported
65+
66+
Previously, you could assign a task to a DAG as follows:
67+
68+
```python
69+
dag = DAG('my_dag')
70+
dummy = DummyOperator(task_id='dummy')
71+
72+
dag >> dummy
73+
```
74+
75+
This is no longer supported. Instead, we recommend using the DAG as context manager:
76+
77+
```python
78+
with DAG('my_dag):
79+
dummy = DummyOperator(task_id='dummy')
80+
```
81+
6482
### Deprecating ignore_first_depends_on_past on backfill command and default it to True
6583

6684
When doing backfill with `depends_on_past` dags, users will need to pass `ignore_first_depends_on_past`.

airflow/models/baseoperator.py

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -471,46 +471,28 @@ def __hash__(self):
471471
def __rshift__(self, other):
472472
"""
473473
Implements Self >> Other == self.set_downstream(other)
474-
475-
If "Other" is a DAG, the DAG is assigned to the Operator.
476474
"""
477-
from airflow.models.dag import DAG
478-
if isinstance(other, DAG):
479-
# if this dag is already assigned, do nothing
480-
# otherwise, do normal dag assignment
481-
if not (self.has_dag() and self.dag is other):
482-
self.dag = other
483-
else:
484-
self.set_downstream(other)
475+
self.set_downstream(other)
485476
return other
486477

487478
def __lshift__(self, other):
488479
"""
489480
Implements Self << Other == self.set_upstream(other)
490-
491-
If "Other" is a DAG, the DAG is assigned to the Operator.
492481
"""
493-
from airflow.models.dag import DAG
494-
if isinstance(other, DAG):
495-
# if this dag is already assigned, do nothing
496-
# otherwise, do normal dag assignment
497-
if not (self.has_dag() and self.dag is other):
498-
self.dag = other
499-
else:
500-
self.set_upstream(other)
482+
self.set_upstream(other)
501483
return other
502484

503485
def __rrshift__(self, other):
504486
"""
505-
Called for [DAG] >> [Operator] because DAGs don't have
487+
Called for Operator >> [Operator] because list don't have
506488
__rshift__ operators.
507489
"""
508490
self.__lshift__(other)
509491
return self
510492

511493
def __rlshift__(self, other):
512494
"""
513-
Called for [DAG] << [Operator] because DAGs don't have
495+
Called for Operator << [Operator] because list don't have
514496
__lshift__ operators.
515497
"""
516498
self.__rshift__(other)

airflow/providers/google/cloud/operators/pubsub.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ class PubSubCreateTopicOperator(BaseOperator):
4141
4242
with DAG('successful DAG') as dag:
4343
(
44-
dag
45-
>> PubSubTopicCreateOperator(project='my-project',
44+
PubSubTopicCreateOperator(project='my-project',
4645
topic='my_new_topic')
4746
>> PubSubTopicCreateOperator(project='my-project',
4847
topic='my_new_topic')
@@ -52,8 +51,7 @@ class PubSubCreateTopicOperator(BaseOperator):
5251
5352
with DAG('failing DAG') as dag:
5453
(
55-
dag
56-
>> PubSubTopicCreateOperator(project='my-project',
54+
PubSubTopicCreateOperator(project='my-project',
5755
topic='my_new_topic')
5856
>> PubSubTopicCreateOperator(project='my-project',
5957
topic='my_new_topic',
@@ -182,8 +180,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
182180
183181
with DAG('successful DAG') as dag:
184182
(
185-
dag
186-
>> PubSubSubscriptionCreateOperator(
183+
PubSubSubscriptionCreateOperator(
187184
topic_project='my-project', topic='my-topic',
188185
subscription='my-subscription')
189186
>> PubSubSubscriptionCreateOperator(
@@ -196,8 +193,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
196193
197194
with DAG('failing DAG') as dag:
198195
(
199-
dag
200-
>> PubSubSubscriptionCreateOperator(
196+
PubSubSubscriptionCreateOperator(
201197
topic_project='my-project', topic='my-topic',
202198
subscription='my-subscription')
203199
>> PubSubSubscriptionCreateOperator(
@@ -210,7 +206,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
210206
211207
with DAG('DAG') as dag:
212208
(
213-
dag >> PubSubSubscriptionCreateOperator(
209+
PubSubSubscriptionCreateOperator(
214210
topic_project='my-project', topic='my-topic')
215211
)
216212
@@ -370,17 +366,15 @@ class PubSubDeleteTopicOperator(BaseOperator):
370366
371367
with DAG('successful DAG') as dag:
372368
(
373-
dag
374-
>> PubSubTopicDeleteOperator(project='my-project',
369+
PubSubTopicDeleteOperator(project='my-project',
375370
topic='non_existing_topic')
376371
)
377372
378373
The operator can be configured to fail if the topic does not exist. ::
379374
380375
with DAG('failing DAG') as dag:
381376
(
382-
dag
383-
>> PubSubTopicCreateOperator(project='my-project',
377+
PubSubTopicCreateOperator(project='my-project',
384378
topic='non_existing_topic',
385379
fail_if_not_exists=True)
386380
)
@@ -482,8 +476,7 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
482476
483477
with DAG('successful DAG') as dag:
484478
(
485-
dag
486-
>> PubSubSubscriptionDeleteOperator(project='my-project',
479+
PubSubSubscriptionDeleteOperator(project='my-project',
487480
subscription='non-existing')
488481
)
489482
@@ -493,8 +486,7 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
493486
494487
with DAG('failing DAG') as dag:
495488
(
496-
dag
497-
>> PubSubSubscriptionDeleteOperator(
489+
PubSubSubscriptionDeleteOperator(
498490
project='my-project', subscription='non-existing',
499491
fail_if_not_exists=True)
500492
)

docs/concepts.rst

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -353,19 +353,6 @@ is equivalent to:
353353
op2.set_downstream(op3)
354354
op3.set_upstream(op4)
355355
356-
For convenience, the bitshift operators can also be used with DAGs. For example:
357-
358-
.. code:: python
359-
360-
dag >> op1 >> op2
361-
362-
is equivalent to:
363-
364-
.. code:: python
365-
366-
op1.dag = dag
367-
op1.set_downstream(op2)
368-
369356
We can put this all together to build a simple pipeline:
370357

371358
.. code:: python

tests/models/test_taskinstance.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -219,36 +219,17 @@ def test_infer_dag(self):
219219

220220
def test_bitshift_compose_operators(self):
221221
dag = DAG('dag', start_date=DEFAULT_DATE)
222-
op1 = DummyOperator(task_id='test_op_1', owner='test')
223-
op2 = DummyOperator(task_id='test_op_2', owner='test')
224-
op3 = DummyOperator(task_id='test_op_3', owner='test')
225-
op4 = DummyOperator(task_id='test_op_4', owner='test')
226-
op5 = DummyOperator(task_id='test_op_5', owner='test')
227-
228-
# can't compose operators without dags
229-
with self.assertRaises(AirflowException):
230-
op1 >> op2
222+
with dag:
223+
op1 = DummyOperator(task_id='test_op_1', owner='test')
224+
op2 = DummyOperator(task_id='test_op_2', owner='test')
225+
op3 = DummyOperator(task_id='test_op_3', owner='test')
231226

232-
dag >> op1 >> op2 << op3
233-
234-
# make sure dag assignment carries through
235-
# using __rrshift__
236-
self.assertIs(op1.dag, dag)
237-
self.assertIs(op2.dag, dag)
238-
self.assertIs(op3.dag, dag)
227+
op1 >> op2 << op3
239228

240229
# op2 should be downstream of both
241230
self.assertIn(op2, op1.downstream_list)
242231
self.assertIn(op2, op3.downstream_list)
243232

244-
# test dag assignment with __rlshift__
245-
dag << op4
246-
self.assertIs(op4.dag, dag)
247-
248-
# dag assignment with __rrshift__
249-
dag >> op5
250-
self.assertIs(op5.dag, dag)
251-
252233
@patch.object(DAG, 'concurrency_reached')
253234
def test_requeue_over_dag_concurrency(self, mock_concurrency_reached):
254235
mock_concurrency_reached.return_value = True

0 commit comments

Comments
 (0)