Add Lineage metrics to Python PubsubIO, BigtableIO, FileIO#32430
Add Lineage metrics to Python PubsubIO, BigtableIO, FileIO#32430Abacn merged 1 commit intoapache:masterfrom
Conversation
6bc0b47 to
777d644
Compare
|
For PubsubIO and BigtableIO, unit test added. For FileIO, because it needs real gcs, and Dataflow runner currently does not return metrics back, tested locally with the following pipeline with beam.Pipeline("DirectRunner", options) as p:
p_read = p | ReadFromText(file_pattern='gs://dataflow-samples/shakespeare/kinghenry*') \
| WriteToText(file_path_prefix='gs://clouddfe-yihu-test/tmp/fileiolineage')
print(Lineage.query(p.result.metrics(), Lineage.SOURCE))
print(Lineage.query(p.result.metrics(), Lineage.SINK))see output: {'gcs:dataflow-samples.`shakespeare/kinghenryviii.txt`', 'gcs:dataflow-samples.`shakespeare/kinghenryv.txt`'}
{'gcs:clouddfe-yihu-test.tmp/fileiolineage-00000-of-00001'} |
|
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| self.report_lineage_once() | ||
| yield self.fn(element) | ||
|
|
||
| def report_lineage_once(self): |
There was a problem hiding this comment.
Do we need to ensure read/write on report_lineage is atomic? In other words, is it possible that multiple threads read and write reported_lineage at the same time leading to a race condition?
There was a problem hiding this comment.
It does not matter because Lineage is backed by a String Set, report once or multiple times has the same result (idempotent). Here I use a local variable to reduce the overhead a little bit (do not do set add operation on every element)
|
python ml precommit and python test were breakages on base branch and fixed on latest master now, not related to this change, and merging for now |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.