Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Jan 18, 2022

Set upstream dependencies when an XComArg is used in a MappedOperator, and (de)serialize them correctly.

We can only re-create XComArg at the DAG level as we need to get hold of the Operator (not just a task_id) so we need a two phase approach here: when deserializing operators we create them as a place-holder class (_XcomRef) and then "up a level" when deserializing the DAG we turn these back in to XComArg objects.

(And in so doing we needed to fix a bug or two in serializing MappedOperator that have a DAG -- it caused a recursion error.)

This PR could possibly be split in to two (one to set deps, a second to serialize them.)


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"badly" as part of the serialization code

if serializable_task.subdag is not None:
setattr(serializable_task.subdag, 'parent_dag', dag)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be fixed/not needed by your #20945?

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jan 19, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, operator: "Union[BaseOperator, MappedOperator]", key: str = XCOM_RETURN_KEY):
def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY):

Any reason for this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid uncessary imports/reduce chance import cycles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #20945 we can probably change this to Operator, which would not cause import cycles.

ashb and others added 2 commits January 19, 2022 13:18
Set upstream dependencies when an XComArg is used in a MappedOperator,
and (de)serialize them correctly.

We can only re-create XComArg at the DAG level as we need to get hold of
the Operator (not just a task_id) so we need a two phase approach here:
when deserializing operators we create them as a place-holder class
(`_XcomRef`) and then "up a level" when deserializing the DAG we turn
these back in to XComArg objects.

(And in so doing we needed to fix a bug or two in serializing
MappedOperator that have a DAG -- it caused a recursion error.)
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
@ashb ashb force-pushed the store-xcomarg-mapped-tasks branch from d28eb4f to c692668 Compare January 19, 2022 13:18
@uranusjr uranusjr merged commit 10f5db8 into apache:main Jan 20, 2022
@uranusjr uranusjr deleted the store-xcomarg-mapped-tasks branch January 20, 2022 06:32
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Apr 8, 2022
@jedcunningham jedcunningham added this to the Airflow 2.3.0 milestone Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dynamic-task-mapping AIP-42 area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants