Skip to content

Add MatchContinuously PTransform#15106

Merged
pabloem merged 4 commits intoapache:masterfrom
InigoSJ:match-continuously-ptransform
Jul 1, 2021
Merged

Add MatchContinuously PTransform#15106
pabloem merged 4 commits intoapache:masterfrom
InigoSJ:match-continuously-ptransform

Conversation

@InigoSJ
Copy link
Copy Markdown
Contributor

@InigoSJ InigoSJ commented Jun 30, 2021

Adds MatchContinuously to Python SDK. This is has the same purpose as FileIO.MatchAll.Continuously but a different implementation, since Watch is not available.

The idea is to use PeriodicImpulse and MatchAll to check for new files, and use a StatefulDoFn to remove duplicates.

I added the experimental since in the future it may we worth changing to a Python version of Watch


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@InigoSJ
Copy link
Copy Markdown
Contributor Author

InigoSJ commented Jun 30, 2021

R: @pabloem

I had some problem running the tests on my env, so I would need to double check them here.

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 30, 2021

Codecov Report

Merging #15106 (d315d57) into master (b86fcf9) will increase coverage by 0.02%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #15106      +/-   ##
==========================================
+ Coverage   83.78%   83.81%   +0.02%     
==========================================
  Files         439      441       +2     
  Lines       59245    59530     +285     
==========================================
+ Hits        49638    49894     +256     
- Misses       9607     9636      +29     
Impacted Files Coverage Δ
sdks/python/apache_beam/io/fileio.py 95.89% <100.00%> (+0.42%) ⬆️
sdks/python/apache_beam/internal/metrics/metric.py 86.17% <0.00%> (-1.07%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.48% <0.00%> (-0.26%) ⬇️
setup.py 0.00% <0.00%> (ø)
...ks/python/apache_beam/ml/gcp/recommendations_ai.py 87.26% <0.00%> (ø)
...n/apache_beam/ml/gcp/recommendations_ai_test_it.py 69.76% <0.00%> (ø)
sdks/python/apache_beam/runners/common.py 89.32% <0.00%> (+0.14%) ⬆️
...eam/runners/interactive/interactive_environment.py 90.70% <0.00%> (+0.37%) ⬆️
.../apache_beam/runners/direct/transform_evaluator.py 90.44% <0.00%> (+0.65%) ⬆️
...ks/python/apache_beam/runners/worker/data_plane.py 92.42% <0.00%> (+1.81%) ⬆️
... and 1 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b86fcf9...d315d57. Read the comment docs.

@InigoSJ
Copy link
Copy Markdown
Contributor Author

InigoSJ commented Jun 30, 2021

Run PythonDocker PreCommit

Copy link
Copy Markdown
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @InigoSJ - this looks great. I added a few comments.

# Add file name that will be created mid-pipeline
files.append(FileSystems.join(tempdir, 'extra'))

interval = 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make a shorter interval so the test will run faster? I suppose you can't run this with a TestStream? Have you tried adding a TestStream that moves time immediately?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I could make it shorter no problem. Regarding TestStream, I am not sure how this would be implemented here, since MatchContinuously needs a PBegin as "input" (since it uses PeriodicImpulse)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add, PeriodicImpulse tests, use TestPipeline and not TestStream, I guess it should be the same here

"""Checks for new files for a given pattern every interval.

This ``PTransform`` returns a ``PCollection`` of matching files in the form
of ``FileMetadata`` objects.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about, but I am not sure this would have an actual use case. Not allowing EmptyMatches would produce an error, and since this is a streaming pipeline, that errors would be retried infinitely.

https://beam.apache.org/releases/pydoc/2.30.0/_modules/apache_beam/io/fileio.html#EmptyMatchTreatment

MatchAll has the option to add it and wouldn't be hard, but I think it would translate in the PTransform being less resilient. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm I think that's fair. Let's leave it as it is for now.

@pabloem
Copy link
Copy Markdown
Member

pabloem commented Jun 30, 2021

Run Portable_Python PreCommit

@pabloem
Copy link
Copy Markdown
Member

pabloem commented Jun 30, 2021

Run Python_PVR_Flink PreCommit

@InigoSJ
Copy link
Copy Markdown
Contributor Author

InigoSJ commented Jun 30, 2021

Run Portable_Python PreCommit

5 similar comments
@InigoSJ
Copy link
Copy Markdown
Contributor Author

InigoSJ commented Jun 30, 2021

Run Portable_Python PreCommit

@pabloem
Copy link
Copy Markdown
Member

pabloem commented Jun 30, 2021

Run Portable_Python PreCommit

@pabloem
Copy link
Copy Markdown
Member

pabloem commented Jun 30, 2021

Run Portable_Python PreCommit

@InigoSJ
Copy link
Copy Markdown
Contributor Author

InigoSJ commented Jul 1, 2021

Run Portable_Python PreCommit

@pabloem
Copy link
Copy Markdown
Member

pabloem commented Jul 1, 2021

Run Portable_Python PreCommit

@pabloem pabloem merged commit 22ec4e6 into apache:master Jul 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants