-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Set dependencies in MappedOperator via XComArgs #20931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
airflow/models/baseoperator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"badly" as part of the serialization code
airflow/airflow/serialization/serialized_objects.py
Lines 970 to 971 in 14a057f
| if serializable_task.subdag is not None: | |
| setattr(serializable_task.subdag, 'parent_dag', dag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be fixed/not needed by your #20945?
ephraimbuddy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
airflow/models/xcom_arg.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def __init__(self, operator: "Union[BaseOperator, MappedOperator]", key: str = XCOM_RETURN_KEY): | |
| def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY): |
Any reason for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid uncessary imports/reduce chance import cycles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After #20945 we can probably change this to Operator, which would not cause import cycles.
Set upstream dependencies when an XComArg is used in a MappedOperator, and (de)serialize them correctly. We can only re-create XComArg at the DAG level as we need to get hold of the Operator (not just a task_id) so we need a two phase approach here: when deserializing operators we create them as a place-holder class (`_XcomRef`) and then "up a level" when deserializing the DAG we turn these back in to XComArg objects. (And in so doing we needed to fix a bug or two in serializing MappedOperator that have a DAG -- it caused a recursion error.)
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com> Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
d28eb4f to
c692668
Compare
Set upstream dependencies when an XComArg is used in a MappedOperator, and (de)serialize them correctly.
We can only re-create XComArg at the DAG level as we need to get hold of the Operator (not just a task_id) so we need a two phase approach here: when deserializing operators we create them as a place-holder class (
_XcomRef) and then "up a level" when deserializing the DAG we turn these back in to XComArg objects.(And in so doing we needed to fix a bug or two in serializing MappedOperator that have a DAG -- it caused a recursion error.)
This PR could possibly be split in to two (one to set deps, a second to serialize them.)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.