-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add GCS timespan transform operator #13996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
|
9043e63 to
c3e85d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| import dateutil.tz | |
| from airflow.settings import TIMEZONE |
I think using this would be better. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question - when we compare against GCS modified time we want to use UTC (I think GCS reports in UTC). But maybe for the transfer function it would make more sense to get the timestamps in the local time. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess airflow.timezone.utc would be better for the GCS modified time then, just for uniformity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I took this from an existing hook - but the next commit has it fixed in both spots.
|
@turbaszek since you implemented the original transform-operator I wonder if you would be willing to take a look at this one? |
|
Capturing the outcome of the slack conversation between @turbaszek and myself - I will add:
|
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. Building image for the PR has been cancelled |
e93fa56 to
850fe4c
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
850fe4c to
99eed57
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. Building image for the PR has been cancelled |
f43abe1 to
d1a1df0
Compare
|
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason. |
d1a1df0 to
c79a55f
Compare
|
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease. |
|
Awesome work, congrats on your first merged pull request! |
This change adds a new GCS transform operator. It is based on the existing transform operator with the addition that it will transform multiple files that match the prefix and that were updated within a time-span. The time-span is implicitly defined: it is the time between the current execution timestamp of the DAG instance (time-span start) and the next execution timestamp of the DAG (time-span end).
The use-case is some entity generates files at irregular intervals and an undefined number. The operator will pick up all files that were updated since it executed last. Typically the transform script will iterate over the files, open them, extract some information, collate them into one or more files and upload them to GCS. These result files can then be loaded into BigQuery or processed further or served via a webserver.
Here is an illustration that shows the concept: