Decouple deadline reference types from core in task SDK#61461
Decouple deadline reference types from core in task SDK#61461amoghrajesh merged 24 commits intoapache:mainfrom
Conversation
|
Still a draft, sent it out to run an early CI |
|
cc: @ferruzzi this is related to the discussion thread on slack |
|
There's some failing checks which I am looking at |
1169b24 to
711ca6c
Compare
33fcd26 to
613850b
Compare
|
I think this is good more or less now with a couple of minor suggestions. Also would appreciate @ferruzzi if you could provide some perspective if this would step on your work too much. |
|
I don't understand how this is intended to work. Can you show me what a dag definition looks like now? Also, what is the process for adding a new reference now? Do we still add the logic in models path but now also and the empty class signature in the definitions file as well? |
e8f57be to
32455ac
Compare
ferruzzi
left a comment
There was a problem hiding this comment.
Thanks for untangling this, and thanks for waiting a bit. I had a look and left a few thoughts on some of the ongoing discussions, but it looks good in general.
|
Hey, back from my holiday and looking at this one now. |
|
@ferruzzi I handled your comments (i think), feel free to take another look! |
ferruzzi
left a comment
There was a problem hiding this comment.
Approved with a couple of non-blocking thoughts
Custom deadline references now serialize and deserialize using a wrapper pattern.
Custom deadline references now serialize and deserialize using a wrapper pattern.
Was generative AI tooling used to co-author this PR?
Cursor IDE with Claude sonnet 4 1/2
closes: #59303
Summary
Restore support for custom deadline references by implementing a
SerializedCustomReferencewrapper in Core that bridges SDK defined custom refs with core serialization for deadline alerts.Motivation
After PR #61118 moved deadline alert decoding from SDK to Core and introduced
SerializedReferenceModels, custom deadline references stopped working. The decoder only looked up types inSerializedReferenceModels, but custom refs registered via the SDK'sDeadlineReference.register_custom_reference()only existed inReferenceModels. This caused "No reference class found with name: MyCustomRef" errors.What has changed?
Core (
airflow-core/)SerializedCustomReference wrapper
SerializedReferenceModels.SerializedCustomReferenceclass inserialization/definitions/deadline.pyevaluate_with()logic since SDK'sBaseDeadlineReferenceis lightweight wrapper without it_evaluate_with()execution to the inner custom refDecoder (
serialization/decoders.py)__class_pathis present in serialized data, usesSerializedCustomReference.deserialize_reference()to import and wrap the custom refSerializedReferenceModels.get_reference_class()lookupDeadlineAlert model
reference_classproperty returnsSerializedCustomReferencewhen__class_pathis present in the stored reference dictCreate deadline & prune logic (
serialization/definitions/dag.py,models/dagrun.py)SerializedReferenceModels.TYPES.DAGRUNSerializedCustomReferenceis in that tupleArchitecture with these changes
How custom refs work now
__class_pathfor custom refs (no changes)__class_path, imports custom class, wraps inSerializedCustomReferenceisinstance(ref, TYPES.DAGRUN)→ True (wrapper is in tuple) → deadline createdevaluate_with()validates kwargs, calls custom ref's_evaluate_with(), adds intervalreference_class in TYPES.DAGRUN→ True → deadline pruned on successTesting
Define the custom reference in a plugin:
DAG:
Result:

{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.