-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add params dag_id, task_id etc to XCom.serialize_value #19505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok @ashb i updated it to look at signature to do so, i appropriated a nice convenience function from connexion |
4fe9ea7 to
b1d5ede
Compare
303f6b9 to
751cdfc
Compare
|
Can you update the PR title as well -- feels incomplete |
|
Is there any work arounds available until this gets merged? |
you could do it by overriding |
751cdfc to
a4aefdd
Compare
|
@uranusjr needs to make a similar sort of change (where the signature for BaseOperatorLink changes) and I think he was proposing putting I guess the main difference there is the number of args wasn't changing, but sig.inspect is relatively slow. Thoughts TP? Should we use the same |
|
Slightly off-topic but curious to know if anyone can help me out. When is the Also what is the difference between |
Yeah sounds reasonable. Probably not the end of the world to have a little slowness here but it does sound like a clean and direct way to achieve the same goal. LMK
Clear will be called at the start of the run, no matter whether it's the first attempt, second attempt, a retry, a backfill -- whatever: XCom is cleared at the start of the task. Delete vs clear, delete is just lower level and will delete a collection of xcom objects. Clear removes all xcom records for a specific task instance But now that you bring this up, yeah depending on how you implement your custom xcom backend the backing files might not be purged. E.g. in the example backend on the astronomer blog they wouldn't be; but the database would not know they are there so in effect they are not there, apart from showing up your s3 bill. |
|
@dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database, its moreso meant to ensure each task starts with a clean slate and doesn't duplicate its own XComs? What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case? |
I wouldn't say quite that... if you are making a custom backend and you don't want orphaned objects, you very well might want to override clear to have it do that. Admittedly BaseXCom might eventually want a refactor to be more friendly for this issue, and for other aspects of implementing custom xcom backends also.
Airflow does have the behavior of clearing xcoms at execution start.
Yes of course you can do whatever you want to do in your custom xcom backend so long as it plays nicely enough with airflow 🙂 Maybe ping me on slack if you want to continue the conversation on this topic, which is separate from this PR. |
|
So if my thinking is right, this is the flow of a DAG: (Clear Task A XCOM from airflow DB) Task A -> (Clear Task B XCOM from airflow DB) Task B -> (Clear Task C XCOM from airflow DB) Task C ^ Task A data does not exist in S3 yet, so we can't clear it. Just wondering when is the time to be purging S3? |
ping me on slack |
|
I think I need your email? I could also make a thread on the discussion tab, if youd like. Discussion: #20029 |
a4aefdd to
c05a15a
Compare
|
The approach looks good to me if you really don’t want to go with the version number approach. It’s unfortunate we need to do this, but there are not too many choices. |
c8ddd05 to
c97eb55
Compare
uranusjr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m OK with this.
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
|
Seems that one would be indeded needed - no details but possibly there will be more comments from the users here https://apache-airflow.slack.com/archives/CCR6P6JRL/p1644129201867359?thread_ts=1643448538.080509&cid=CCR6P6JRL |
|
Possibly it would be useful to also pass context here as well? |
|
Is there any other metadata that we can pass, potentially? I can't have a generic container with mixed data from many clients since the @task functions will save the actual data to the backend. People can only access data from certain clients (at least on Azure Blob, security is at the container/bucket level), so I am hoping for a way to provide this information somehow. Overwriting the set() function works, but in the example below I assume that the I also need a clarification about the XCom backend. When I transform raw data, I want to keep the result permanently and reuse it for reporting, validation, etc. A common task might be unnesting JSON data into a tabular format.
Here is my example custom XCom backend class: |
c97eb55 to
d05e72b
Compare
|
@potiuk I think adding context is a perfectly reasonable idea, but i don't see any reason it needs to be lumped together into this PR. probably better handled separately IMO. this one is just about making |
Fine for me |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets leave the "public" interface as XCom.serialize_value, and not expose the shim please.
0c4f799 to
41a75cb
Compare
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good now, one small change (and we probably need to wait on #20975 first)
2566282 to
251e6b5
Compare
When implementing a custom XCom backend, in order to store XCom objects organized by dag_id, run_id etc, we need to pass those params to `serialize_value`.
27da039 to
724e62e
Compare
When implementing a custom XCom backend, in order to store XCom objects organized by dag_id, run_id etc, we need to pass those params to
serialize_value.