Skip to content

Decouple deadline reference types from core in task SDK#61461

Merged
amoghrajesh merged 24 commits intoapache:mainfrom
astronomer:complete-deadline-alerts-work
Mar 11, 2026
Merged

Decouple deadline reference types from core in task SDK#61461
amoghrajesh merged 24 commits intoapache:mainfrom
astronomer:complete-deadline-alerts-work

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Feb 4, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Cursor IDE with Claude sonnet 4 1/2

closes: #59303

Summary

Restore support for custom deadline references by implementing a SerializedCustomReference wrapper in Core that bridges SDK defined custom refs with core serialization for deadline alerts.

Motivation

After PR #61118 moved deadline alert decoding from SDK to Core and introduced SerializedReferenceModels, custom deadline references stopped working. The decoder only looked up types in SerializedReferenceModels, but custom refs registered via the SDK's DeadlineReference.register_custom_reference() only existed in ReferenceModels. This caused "No reference class found with name: MyCustomRef" errors.

What has changed?

Core (airflow-core/)

SerializedCustomReference wrapper

  • Added SerializedReferenceModels.SerializedCustomReference class in serialization/definitions/deadline.py
  • Wraps custom deadline references imported from user code
  • Implements the evaluate_with() logic since SDK's BaseDeadlineReference is lightweight wrapper without it
  • Delegates _evaluate_with() execution to the inner custom ref

Decoder (serialization/decoders.py)

  • When __class_path is present in serialized data, uses SerializedCustomReference.deserialize_reference() to import and wrap the custom ref
  • Builtin types continue to use SerializedReferenceModels.get_reference_class() lookup

DeadlineAlert model

  • reference_class property returns SerializedCustomReference when __class_path is present in the stored reference dict
  • Returns standard serialized types for built-ins

Create deadline & prune logic (serialization/definitions/dag.py, models/dagrun.py)

  • No changes needed - already use SerializedReferenceModels.TYPES.DAGRUN
  • Custom refs now work because SerializedCustomReference is in that tuple

Architecture with these changes

SDK (task-sdk/) - DAG Authoring
├── deadline.py
│ ├── DeadlineAlert (user-facing)
│ ├── DeadlineReference (public interface)
│ └── BaseDeadlineReference (lightweight, serialization only)
Core (airflow-core/) - Scheduling/Execution
├── serialization/
│ ├── encoders.py (adds class_path for custom types)
│ ├── decoders.py (wraps custom types in SerializedCustomReference)
│ └── definitions/deadline.py
│ ├── SerializedReferenceModels
│ │ ├── Built-in types (with evaluate_with)
│ │ └── SerializedCustomReference (wrapper for custom refs)
│ └── TYPES.DAGRUN (includes wrapper)

How custom refs work now

  1. Encoder adds __class_path for custom refs (no changes)
  2. Decoder sees __class_path, imports custom class, wraps in SerializedCustomReference
  3. isinstance(ref, TYPES.DAGRUN) → True (wrapper is in tuple) → deadline created
  4. Wrapper's evaluate_with() validates kwargs, calls custom ref's _evaluate_with(), adds interval
  5. reference_class in TYPES.DAGRUN → True → deadline pruned on success

Testing

Define the custom reference in a plugin:

from airflow.sdk.definitions.deadline import DeadlineReference, deadline_reference, BaseDeadlineReference
from airflow._shared.timezones import timezone


@deadline_reference()
class MyCustomRef(BaseDeadlineReference):
    """Custom ref: deadline is now + interval."""

    def _evaluate_with(self, *, session, **kwargs):
        return timezone.utcnow()


DeadlineReference.register_custom_reference(MyCustomRef)

async def on_deadline(**kwargs):
    print("Custom ref deadline exceeded", kwargs)

DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference

from custom_deadline_refs import on_deadline


with DAG(
    dag_id="testing-custom-reference-deadlines",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    deadline=DeadlineAlert(
        reference=DeadlineReference.MyCustomRef,
        interval=timedelta(seconds=10),
        callback=AsyncCallback(on_deadline),
    ),
):
    BashOperator(task_id="example_task", bash_command="sleep 120")

Result:
image

image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh added this to the Airflow 3.2.0 milestone Feb 4, 2026
@amoghrajesh amoghrajesh self-assigned this Feb 4, 2026
@amoghrajesh amoghrajesh marked this pull request as draft February 4, 2026 17:50
@amoghrajesh
Copy link
Contributor Author

Still a draft, sent it out to run an early CI

@amoghrajesh
Copy link
Contributor Author

cc: @ferruzzi this is related to the discussion thread on slack

@amoghrajesh amoghrajesh marked this pull request as ready for review February 8, 2026 16:42
@amoghrajesh
Copy link
Contributor Author

There's some failing checks which I am looking at

@amoghrajesh amoghrajesh force-pushed the complete-deadline-alerts-work branch from 1169b24 to 711ca6c Compare February 9, 2026 08:19
@amoghrajesh amoghrajesh requested a review from uranusjr February 9, 2026 09:10
@amoghrajesh amoghrajesh force-pushed the complete-deadline-alerts-work branch from 33fcd26 to 613850b Compare February 10, 2026 10:43
@uranusjr
Copy link
Member

I think this is good more or less now with a couple of minor suggestions. Also would appreciate @ferruzzi if you could provide some perspective if this would step on your work too much.

@ferruzzi
Copy link
Contributor

I don't understand how this is intended to work. Can you show me what a dag definition looks like now? Also, what is the process for adding a new reference now? Do we still add the logic in models path but now also and the empty class signature in the definitions file as well?

@amoghrajesh amoghrajesh force-pushed the complete-deadline-alerts-work branch from e8f57be to 32455ac Compare March 3, 2026 12:33
@amoghrajesh amoghrajesh requested a review from kaxil March 3, 2026 12:46
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for untangling this, and thanks for waiting a bit. I had a look and left a few thoughts on some of the ongoing discussions, but it looks good in general.

@amoghrajesh
Copy link
Contributor Author

Hey, back from my holiday and looking at this one now.

@amoghrajesh
Copy link
Contributor Author

@ferruzzi I handled your comments (i think), feel free to take another look!

@amoghrajesh amoghrajesh requested a review from ferruzzi March 10, 2026 12:16
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved with a couple of non-blocking thoughts

@amoghrajesh amoghrajesh merged commit 28b3f6e into apache:main Mar 11, 2026
132 checks passed
@amoghrajesh amoghrajesh deleted the complete-deadline-alerts-work branch March 11, 2026 08:34
@amoghrajesh
Copy link
Contributor Author

Thanks for your reviews folks, @ferruzzi @uranusjr @potiuk @kaxil, this one's merged now. Will follow up with prek hook cleanup right away

dominikhei pushed a commit to dominikhei/airflow that referenced this pull request Mar 11, 2026
Custom deadline references now serialize and deserialize using a wrapper pattern.
Pyasma pushed a commit to Pyasma/airflow that referenced this pull request Mar 13, 2026
Custom deadline references now serialize and deserialize using a wrapper pattern.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:deadline-alerts AIP-86 (former AIP-57) area:serialization area:task-sdk full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Separate DeadlineAlert implementations in SDK and Core

5 participants