Skip to content

Conversation

@dstandish
Copy link
Contributor

When implementing a custom XCom backend, in order to store XCom objects organized by dag_id, run_id etc, we need to pass those params to serialize_value.

@dstandish
Copy link
Contributor Author

ok @ashb i updated it to look at signature

to do so, i appropriated a nice convenience function from connexion

@kaxil
Copy link
Member

kaxil commented Nov 26, 2021

Can you update the PR title as well -- feels incomplete

@dstandish dstandish changed the title XCom.serialize_value should have all params set does Add params dag_id, task_id etc to XCom.serialize_value Nov 26, 2021
@jmaldon1
Copy link

jmaldon1 commented Dec 2, 2021

Is there any work arounds available until this gets merged?

@dstandish
Copy link
Contributor Author

Is there any work arounds available until this gets merged?

you could do it by overriding set, which receives all the params -- it just doesn't pass them on to serialize_value

@dstandish dstandish force-pushed the xcom-serialize-call branch from 751cdfc to a4aefdd Compare December 3, 2021 02:46
@ashb
Copy link
Member

ashb commented Dec 3, 2021

@uranusjr needs to make a similar sort of change (where the signature for BaseOperatorLink changes) and I think he was proposing putting version class attribute instead of using sig.inspect.

I guess the main difference there is the number of args wasn't changing, but sig.inspect is relatively slow. Thoughts TP? Should we use the same Class.version attribute approach here?

@jmaldon1
Copy link

jmaldon1 commented Dec 3, 2021

Slightly off-topic but curious to know if anyone can help me out.

When is the clear method called in the DAG process?
How are we avoiding the case that we clear the data from the external database, but the tasks fails and has to rerun?
If the task reruns, won't it need to query for the data again, but if we deleted it already, then it wont be there anymore.

Also what is the difference between delete and clear. When is delete called?

@dstandish
Copy link
Contributor Author

@uranusjr needs to make a similar sort of change (where the signature for BaseOperatorLink changes) and I think he was proposing putting version class attribute instead of using sig.inspect.

I guess the main difference there is the number of args wasn't changing, but sig.inspect is relatively slow. Thoughts TP? Should we use the same Class.version attribute approach here?

Yeah sounds reasonable. Probably not the end of the world to have a little slowness here but it does sound like a clean and direct way to achieve the same goal. LMK

@jmaldon1

Slightly off-topic but curious to know if anyone can help me out.
When is the clear method called in the DAG process?
How are we avoiding the case that we clear the data from the external database, but the tasks fails and has to rerun?
If the task reruns, won't it need to query for the data again, but if we deleted it already, then it wont be there anymore.
Also what is the difference between delete and clear. When is delete called?

Clear will be called at the start of the run, no matter whether it's the first attempt, second attempt, a retry, a backfill -- whatever: XCom is cleared at the start of the task.

Delete vs clear, delete is just lower level and will delete a collection of xcom objects. Clear removes all xcom records for a specific task instance

But now that you bring this up, yeah depending on how you implement your custom xcom backend the backing files might not be purged. E.g. in the example backend on the astronomer blog they wouldn't be; but the database would not know they are there so in effect they are not there, apart from showing up your s3 bill.

@jmaldon1
Copy link

jmaldon1 commented Dec 3, 2021

@dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database, its moreso meant to ensure each task starts with a clean slate and doesn't duplicate its own XComs?

What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case?

@dstandish
Copy link
Contributor Author

dstandish commented Dec 3, 2021

@dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database

I wouldn't say quite that... if you are making a custom backend and you don't want orphaned objects, you very well might want to override clear to have it do that. Admittedly BaseXCom might eventually want a refactor to be more friendly for this issue, and for other aspects of implementing custom xcom backends also.

its moreso meant to ensure each task starts with a clean slate and doesn't duplicate its own XComs?

Airflow does have the behavior of clearing xcoms at execution start.

What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case?

Yes of course you can do whatever you want to do in your custom xcom backend so long as it plays nicely enough with airflow 🙂

Maybe ping me on slack if you want to continue the conversation on this topic, which is separate from this PR.

@jmaldon1
Copy link

jmaldon1 commented Dec 3, 2021

So if my thinking is right, this is the flow of a DAG:

(Clear Task A XCOM from airflow DB) Task A -> (Clear Task B XCOM  from airflow DB) Task B -> (Clear Task C XCOM  from airflow DB) Task C
^ Task A data does not exist in S3 yet, so we can't clear it.

Just wondering when is the time to be purging S3?

@dstandish
Copy link
Contributor Author

So if my thinking is right, this is the flow of a DAG:

(Clear Task A XCOM from airflow DB) Task A -> (Clear Task B XCOM from airflow DB) Task B -> (Clear Task C XCOM from airflow DB) Task C
^ Task A data does not exist in S3 yet, so we can't clear it.
Just wondering when is the time to be purging S3?

ping me on slack

@jmaldon1
Copy link

jmaldon1 commented Dec 3, 2021

I think I need your email? I could also make a thread on the discussion tab, if youd like.

Discussion: #20029

@dstandish
Copy link
Contributor Author

@uranusjr @ashb is this the right way to approach this? Or should we use version decorator?

@uranusjr
Copy link
Member

The approach looks good to me if you really don’t want to go with the version number approach. It’s unfortunate we need to do this, but there are not too many choices.

@dstandish dstandish force-pushed the xcom-serialize-call branch 2 times, most recently from c8ddd05 to c97eb55 Compare January 28, 2022 18:03
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m OK with this.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jan 29, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@potiuk
Copy link
Member

potiuk commented Feb 6, 2022

Seems that one would be indeded needed - no details but possibly there will be more comments from the users here https://apache-airflow.slack.com/archives/CCR6P6JRL/p1644129201867359?thread_ts=1643448538.080509&cid=CCR6P6JRL

@potiuk
Copy link
Member

potiuk commented Feb 6, 2022

Possibly it would be useful to also pass context here as well?

@ldacey
Copy link
Contributor

ldacey commented Feb 7, 2022

Is there any other metadata that we can pass, potentially? I can't have a generic container with mixed data from many clients since the @task functions will save the actual data to the backend. People can only access data from certain clients (at least on Azure Blob, security is at the container/bucket level), so I am hoping for a way to provide this information somehow.

Overwriting the set() function works, but in the example below I assume that the dag_id.split("-")[0] is the name of the container/bucket/cloud storage folder. This is not universal though, so I need to map each container name to a DAG ID. That is feasible, but it would be great if we could pass some metadata.

I also need a clarification about the XCom backend. When I transform raw data, I want to keep the result permanently and reuse it for reporting, validation, etc. A common task might be unnesting JSON data into a tabular format.

  1. Save data to cloud storage with an Operator, pass the list of files/fragments/partitions saved to XCom to be read by downstream tasks. This is the current status, and the benefit is that I can control where the data is saved and I can reuse that data for other things.

  2. Return a value in a specific format (pyarrow Table in my example below) and the file will be saved automatically. This is what happens with the custom XCom backend and it reduces the amount of boilerplate and seems like a cleaner approach. However, there is less control over where this data gets saved. Am I misusing the XCom backend by wanting to keep and reuse the data similar? It seems like a waste, since the exact same data is undergoing the exact same transformation.

Here is my example custom XCom backend class:

class WasbXComBackend(BaseXCom):
    @classmethod
    @provide_session
    def set(
        cls, key, value, task_id, dag_id, execution_date=None, run_id=None, session=None
    ):
        """Store an XCom value"""
        from airflow.models.xcom import XCom

        if not (execution_date is None) ^ (run_id is None):
            raise ValueError("Exactly one of execution_date or run_id must be passed")
        if run_id:
            from airflow.models.dagrun import DagRun

            dag_run = (
                session.query(DagRun).filter_by(dag_id=dag_id, run_id=run_id).one()
            )
            execution_date = dag_run.execution_date
        value = WasbXComBackend.serialize_value(
            dag_id, task_id, run_id, execution_date, value
        )
        session.query(cls).filter(
            cls.key == key,
            cls.execution_date == execution_date,
            cls.task_id == task_id,
            cls.dag_id == dag_id,
        ).delete()
        session.commit()
        session.add(
            XCom(
                key=key,
                value=value,
                execution_date=execution_date,
                task_id=task_id,
                dag_id=dag_id,
            )
        )
        session.commit()
        session.flush()

    @staticmethod
    def serialize_value(
        dag_id: str, task_id: str, run_id: str, execution_date: UtcDateTime, value: Any
    ):
        if isinstance(value, pa.Table):
            fs = AdlfsHook(wasb_conn_id="azure_blob_conn_str").fs
            folder = dag_id.split("-")[0]
            if run_id is not None:
                id = run_id.replace("-", "").replace(":", "")
            else:
                id = execution_date.strftime("%Y%m%dT%H%M%S")
            path = f"{folder}/xcom/{task_id}/{id}.parquet"
            pq.write_table(
                table=value,
                where=path,
                filesystem=fs,
                use_dictionary=True,
                compression="snappy",
                flavor="spark",
            )
            value = path
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(result) -> Any:
        result = BaseXCom.deserialize_value(result)
        if isinstance(result, str) and "xcom" in result:
            path = result
            fs = AdlfsHook(wasb_conn_id="azure_blob_conn_str").fs
            if path.endswith(".parquet"):
                return pq.read_table(
                    source=path, use_pandas_metadata=True, filesystem=fs
                )
        return result

@dstandish dstandish force-pushed the xcom-serialize-call branch from c97eb55 to d05e72b Compare February 8, 2022 23:09
@dstandish
Copy link
Contributor Author

@potiuk I think adding context is a perfectly reasonable idea, but i don't see any reason it needs to be lumped together into this PR. probably better handled separately IMO. this one is just about making serialize consistent with set. later we can consider extending what is passed to set (and update serialize in the same way)

@potiuk
Copy link
Member

potiuk commented Feb 8, 2022

@potiuk I think adding context is a perfectly reasonable idea, but i don't see any reason it needs to be lumped together into this PR. probably better handled separately IMO. this one is just about making serialize consistent with set. later we can consider extending what is passed to set (and update serialize in the same way)

Fine for me

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets leave the "public" interface as XCom.serialize_value, and not expose the shim please.

@dstandish dstandish force-pushed the xcom-serialize-call branch from 0c4f799 to 41a75cb Compare February 9, 2022 23:51
@dstandish dstandish requested a review from ashb February 10, 2022 03:47
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now, one small change (and we probably need to wait on #20975 first)

@dstandish dstandish force-pushed the xcom-serialize-call branch 7 times, most recently from 2566282 to 251e6b5 Compare February 16, 2022 14:13
@dstandish dstandish requested a review from ashb February 16, 2022 23:42
When implementing a custom XCom backend, in order to store XCom objects organized by dag_id, run_id etc, we need to pass those params to `serialize_value`.
@dstandish dstandish merged commit 56285ee into apache:main Feb 17, 2022
@jedcunningham jedcunningham added the type:bug-fix Changelog: Bug Fixes label Feb 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants