-
-
Notifications
You must be signed in to change notification settings - Fork 48
Replace LimitingPipeline with LimitTransformer #729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace LimitingPipeline with LimitTransformer #729
Conversation
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
1 similar comment
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
Flow PHP - BenchmarksResults of the benchmarks from this PR are compared with the results from 1.x branch. Extractors+-----------------------+-------------------+------+-----+------------------+-------------------+------------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-----------------------+-------------------+------+-----+------------------+-------------------+------------------+
| AvroExtractorBench | bench_extract_10k | 1 | 3 | 34.879mb +0.03% | 361.577ms -16.34% | ±0.99% +102.52% |
| CSVExtractorBench | bench_extract_10k | 1 | 3 | 4.767mb +0.13% | 269.400ms -21.76% | ±0.04% -84.53% |
| JsonExtractorBench | bench_extract_10k | 1 | 3 | 4.914mb +0.16% | 572.385ms -17.55% | ±1.46% +1891.05% |
| ParquetExtractorBench | bench_extract_10k | 1 | 3 | 233.470mb +0.00% | 792.023ms -22.33% | ±0.46% -18.85% |
| TextExtractorBench | bench_extract_10k | 1 | 3 | 4.757mb +0.22% | 19.207ms -12.56% | ±2.22% +533.25% |
| XmlExtractorBench | bench_extract_10k | 1 | 3 | 4.761mb +0.13% | 404.971ms -27.17% | ±0.70% +29.63% |
+-----------------------+-------------------+------+-----+------------------+-------------------+------------------+
Transformers+-----------------------------+--------------------------+------+-----+-----------------+------------------+-----------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-----------------------------+--------------------------+------+-----+-----------------+------------------+-----------------+
| RenameEntryTransformerBench | bench_transform_10k_rows | 1 | 3 | 87.033mb +0.00% | 49.048ms -29.39% | ±0.94% +112.13% |
+-----------------------------+--------------------------+------+-----+-----------------+------------------+-----------------+
Loaders+--------------------+----------------+------+-----+------------------+-------------------+-----------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+--------------------+----------------+------+-----+------------------+-------------------+-----------------+
| AvroLoaderBench | bench_load_10k | 1 | 3 | 93.197mb +0.08% | 578.509ms -15.85% | ±0.92% +6.04% |
| CSVLoaderBench | bench_load_10k | 1 | 3 | 45.976mb +0.16% | 69.304ms +0.35% | ±0.83% +79.69% |
| JsonLoaderBench | bench_load_10k | 1 | 3 | 88.543mb +0.01% | 61.472ms -22.97% | ±0.54% +530.08% |
| ParquetLoaderBench | bench_load_10k | 1 | 3 | 283.983mb +0.00% | 1.176s -23.37% | ±0.78% +123.14% |
| TextLoaderBench | bench_load_10k | 1 | 3 | 16.532mb +0.07% | 41.002ms +9.17% | ±0.24% -88.56% |
+--------------------+----------------+------+-----+------------------+-------------------+-----------------+
Building Blocks+-------------------------+----------------------------+------+-----+-----------------+-------------------+------------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-------------------------+----------------------------+------+-----+-----------------+-------------------+------------------+
| RowsBench | bench_chunk_10_on_10k | 2 | 3 | 60.657mb +0.00% | 2.268ms -49.85% | ±2.54% -22.85% |
| RowsBench | bench_diff_left_1k_on_10k | 2 | 3 | 80.449mb +0.00% | 152.705ms -14.23% | ±0.57% +89.38% |
| RowsBench | bench_diff_right_1k_on_10k | 2 | 3 | 58.975mb +0.00% | 15.034ms -17.67% | ±0.28% +1.09% |
| RowsBench | bench_drop_1k_on_10k | 2 | 3 | 59.795mb +0.00% | 1.773ms -44.53% | ±3.63% +51.01% |
| RowsBench | bench_drop_right_1k_on_10k | 2 | 3 | 59.795mb +0.00% | 1.838ms -37.87% | ±2.23% -21.19% |
| RowsBench | bench_entries_on_10k | 2 | 3 | 59.009mb +0.00% | 2.620ms -41.97% | ±1.23% -47.99% |
| RowsBench | bench_filter_on_10k | 2 | 3 | 59.538mb +0.00% | 14.323ms -41.22% | ±1.75% +130.81% |
| RowsBench | bench_find_on_10k | 2 | 3 | 59.537mb +0.00% | 14.478ms -40.47% | ±1.31% +86.84% |
| RowsBench | bench_find_one_on_10k | 10 | 3 | 57.609mb +0.00% | 1.800μs -28.00% | ±0.00% 0.00% |
| RowsBench | bench_first_on_10k | 10 | 3 | 57.609mb +0.00% | 0.400μs -20.00% | ±0.00% +0.00% |
| RowsBench | bench_flat_map_on_1k | 2 | 3 | 65.842mb +0.00% | 10.333ms -24.78% | ±1.76% -14.79% |
| RowsBench | bench_map_on_10k | 2 | 3 | 91.362mb +0.00% | 48.872ms -23.10% | ±2.18% +129.55% |
| RowsBench | bench_merge_1k_on_10k | 2 | 3 | 60.058mb +0.00% | 1.930ms -42.18% | ±0.61% -77.23% |
| RowsBench | bench_partition_by_on_10k | 2 | 3 | 62.328mb +0.00% | 32.981ms -35.32% | ±0.92% +59.00% |
| RowsBench | bench_remove_on_10k | 2 | 3 | 62.159mb +0.00% | 4.983ms -38.40% | ±0.93% -68.13% |
| RowsBench | bench_sort_asc_on_1k | 2 | 3 | 57.609mb +0.00% | 36.751ms -27.81% | ±0.88% +18.97% |
| RowsBench | bench_sort_by_on_1k | 2 | 3 | 57.609mb +0.00% | 36.770ms -27.14% | ±1.51% +176.54% |
| RowsBench | bench_sort_desc_on_1k | 2 | 3 | 57.609mb +0.00% | 37.437ms -26.24% | ±0.09% -95.38% |
| RowsBench | bench_sort_entries_on_1k | 2 | 3 | 59.883mb +0.00% | 7.430ms -21.34% | ±1.05% +471.20% |
| RowsBench | bench_sort_on_1k | 2 | 3 | 57.608mb +0.00% | 28.635ms -23.57% | ±0.41% +336.29% |
| RowsBench | bench_take_1k_on_10k | 10 | 3 | 57.609mb +0.00% | 12.683μs -39.89% | ±1.69% +336.14% |
| RowsBench | bench_take_right_1k_on_10k | 10 | 3 | 57.609mb +0.00% | 15.388μs -42.50% | ±0.61% -51.28% |
| RowsBench | bench_unique_on_1k | 2 | 3 | 80.450mb +0.00% | 156.669ms -12.80% | ±0.57% -26.81% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 91.729mb -0.00% | 116.619ms -21.65% | ±2.03% +392.57% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 47.592mb +0.02% | 60.198ms -21.51% | ±2.54% +1368.01% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 12.390mb -0.04% | 13.806ms -21.82% | ±1.81% +57.91% |
+-------------------------+----------------------------+------+-----+-----------------+-------------------+------------------+
|
stloyd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach 😊
| if ($signal === Signal::STOP || $this->reachedLimit()) { | ||
| $context->streams()->close($this->path); | ||
|
|
||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in all file-based ones it could be replaced with:
| return; | |
| break 2; |
along with that closing the stream could be removed as would be done at the end of that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started from this approach but return is easier to understand and less problematic in case of additional logic after the loop. Return is more readable at least for me
|
|
||
| enum Signal | ||
| { | ||
| case STOP; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about STOP_PROCESSING? Cause stop is quite a generic name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extractors are not really processing, they are extracting, but I didn't want yet to implement something super specific, I want to wait and see how this will evolve, the moment STOP will become too enigmatic, then we might need to think about splitting it
Change Log
Added
Fixed
Changed
Removed
Deprecated
Security
Refs: #715
Description
In order to analyze the Execution Plan we need to iterate over Pipeline elements, since Limit was implemented as Pipeline itself it was pretty much impossible to understand if/when limit was applied to the pipeline.
This approach changes that, instead of using LimitingPipeline, there is a new Transformer that will throw LimitReached exception which should be recognized to send Signal::STOP to generator.
All generators are now expecting Signals, once Singal::STOP is received, generators are closed.
This PR comes also with adding Limit directly to Extractors, it's for the performance auto optimization that will happen in the next PR.
Once limit is applied to the Extractor, it will throw only specific number of rows, some extractors like Parquet will use that limit to read only specific number of values instead of reading the entire page of data.