Add MatchContinuously PTransform#15106
Conversation
|
R: @pabloem I had some problem running the tests on my env, so I would need to double check them here. |
Codecov Report
@@ Coverage Diff @@
## master #15106 +/- ##
==========================================
+ Coverage 83.78% 83.81% +0.02%
==========================================
Files 439 441 +2
Lines 59245 59530 +285
==========================================
+ Hits 49638 49894 +256
- Misses 9607 9636 +29
Continue to review full report at Codecov.
|
|
Run PythonDocker PreCommit |
| # Add file name that will be created mid-pipeline | ||
| files.append(FileSystems.join(tempdir, 'extra')) | ||
|
|
||
| interval = 1 |
There was a problem hiding this comment.
maybe make a shorter interval so the test will run faster? I suppose you can't run this with a TestStream? Have you tried adding a TestStream that moves time immediately?
There was a problem hiding this comment.
Yeah, I could make it shorter no problem. Regarding TestStream, I am not sure how this would be implemented here, since MatchContinuously needs a PBegin as "input" (since it uses PeriodicImpulse)
There was a problem hiding this comment.
To add, PeriodicImpulse tests, use TestPipeline and not TestStream, I guess it should be the same here
| """Checks for new files for a given pattern every interval. | ||
|
|
||
| This ``PTransform`` returns a ``PCollection`` of matching files in the form | ||
| of ``FileMetadata`` objects. |
There was a problem hiding this comment.
Java has an EmptyMatchTreatment. Should we add that here too? https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.html
There was a problem hiding this comment.
I thought about, but I am not sure this would have an actual use case. Not allowing EmptyMatches would produce an error, and since this is a streaming pipeline, that errors would be retried infinitely.
MatchAll has the option to add it and wouldn't be hard, but I think it would translate in the PTransform being less resilient. What do you think?
There was a problem hiding this comment.
hmmm I think that's fair. Let's leave it as it is for now.
|
Run Portable_Python PreCommit |
|
Run Python_PVR_Flink PreCommit |
|
Run Portable_Python PreCommit |
5 similar comments
|
Run Portable_Python PreCommit |
|
Run Portable_Python PreCommit |
|
Run Portable_Python PreCommit |
|
Run Portable_Python PreCommit |
|
Run Portable_Python PreCommit |
Adds
MatchContinuouslyto Python SDK. This is has the same purpose asFileIO.MatchAll.Continuouslybut a different implementation, sinceWatchis not available.The idea is to use
PeriodicImpulseandMatchAllto check for new files, and use a StatefulDoFn to remove duplicates.I added the
experimentalsince in the future it may we worth changing to a Python version ofWatchThank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunnercompliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.