-
-
Notifications
You must be signed in to change notification settings - Fork 48
Pipeline Optimization - LimitOptimizer #730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline Optimization - LimitOptimizer #730
Conversation
d313559 to
11aa9c0
Compare
Flow PHP - BenchmarksResults of the benchmarks from this PR are compared with the results from 1.x branch. Extractors+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
| AvroExtractorBench | bench_extract_10k | 1 | 3 | 34.889mb +0.95% | 440.442ms +1.75% | ±0.81% +245.55% |
| CSVExtractorBench | bench_extract_10k | 1 | 3 | 4.773mb +0.35% | 347.942ms +0.72% | ±0.31% +95.15% |
| JsonExtractorBench | bench_extract_10k | 1 | 3 | 4.921mb +0.33% | 694.870ms -0.76% | ±0.15% -50.43% |
| ParquetExtractorBench | bench_extract_10k | 1 | 3 | 233.480mb +0.01% | 993.406ms -2.37% | ±0.36% +1121.74% |
| TextExtractorBench | bench_extract_10k | 1 | 3 | 4.767mb +0.35% | 23.161ms +1.06% | ±0.31% -88.59% |
| XmlExtractorBench | bench_extract_10k | 1 | 3 | 4.767mb +0.35% | 555.245ms -0.48% | ±0.15% -48.03% |
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
Transformers+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
| RenameEntryTransformerBench | bench_transform_10k_rows | 1 | 3 | 87.034mb +0.02% | 66.939ms -2.76% | ±0.85% +7.18% |
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
Loaders+--------------------+----------------+------+-----+------------------+------------------+-----------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
| AvroLoaderBench | bench_load_10k | 1 | 3 | 93.274mb +0.02% | 695.975ms -1.25% | ±2.51% +133.35% |
| CSVLoaderBench | bench_load_10k | 1 | 3 | 46.050mb +0.04% | 69.665ms +0.70% | ±0.34% -70.21% |
| JsonLoaderBench | bench_load_10k | 1 | 3 | 88.552mb +0.02% | 76.827ms -1.88% | ±0.30% -78.24% |
| ParquetLoaderBench | bench_load_10k | 1 | 3 | 283.992mb +0.01% | 1.542s +1.03% | ±0.61% +140.28% |
| TextLoaderBench | bench_load_10k | 1 | 3 | 16.544mb +0.10% | 37.464ms +1.12% | ±1.69% +238.31% |
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
Building Blocks+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
| RowsBench | bench_chunk_10_on_10k | 2 | 3 | 60.658mb +0.00% | 4.136ms +1.70% | ±2.42% +194.22% |
| RowsBench | bench_diff_left_1k_on_10k | 2 | 3 | 80.450mb +0.00% | 180.197ms +0.73% | ±1.01% -34.97% |
| RowsBench | bench_diff_right_1k_on_10k | 2 | 3 | 58.976mb +0.00% | 17.874ms -0.39% | ±0.22% -71.13% |
| RowsBench | bench_drop_1k_on_10k | 2 | 3 | 59.797mb +0.00% | 2.871ms -2.14% | ±1.27% +824.56% |
| RowsBench | bench_drop_right_1k_on_10k | 2 | 3 | 59.797mb +0.00% | 3.003ms +2.09% | ±2.16% +540.12% |
| RowsBench | bench_entries_on_10k | 2 | 3 | 59.010mb +0.00% | 3.975ms -6.86% | ±2.45% -30.82% |
| RowsBench | bench_filter_on_10k | 2 | 3 | 59.539mb +0.00% | 23.573ms -0.44% | ±0.93% -48.02% |
| RowsBench | bench_find_on_10k | 2 | 3 | 59.539mb +0.00% | 23.304ms -1.34% | ±0.50% -37.69% |
| RowsBench | bench_find_one_on_10k | 10 | 3 | 57.610mb +0.00% | 2.300μs -4.17% | ±3.55% +4.35% |
| RowsBench | bench_first_on_10k | 10 | 3 | 57.610mb +0.00% | 0.500μs 0.00% | ±0.00% 0.00% |
| RowsBench | bench_flat_map_on_1k | 2 | 3 | 65.843mb +0.00% | 13.634ms +0.79% | ±0.96% -47.03% |
| RowsBench | bench_map_on_10k | 2 | 3 | 91.363mb +0.00% | 60.981ms -3.16% | ±0.28% -54.04% |
| RowsBench | bench_merge_1k_on_10k | 2 | 3 | 60.060mb +0.00% | 3.272ms +4.51% | ±3.24% +834.83% |
| RowsBench | bench_partition_by_on_10k | 2 | 3 | 62.330mb +0.00% | 47.788ms -4.41% | ±0.19% -82.97% |
| RowsBench | bench_remove_on_10k | 2 | 3 | 62.160mb +0.00% | 7.675ms -1.75% | ±0.70% -43.97% |
| RowsBench | bench_sort_asc_on_1k | 2 | 3 | 57.610mb +0.00% | 50.515ms +0.43% | ±0.80% -32.85% |
| RowsBench | bench_sort_by_on_1k | 2 | 3 | 57.610mb +0.00% | 50.659ms +1.01% | ±0.28% +336.47% |
| RowsBench | bench_sort_desc_on_1k | 2 | 3 | 57.610mb +0.00% | 50.661ms +0.24% | ±0.08% -83.89% |
| RowsBench | bench_sort_entries_on_1k | 2 | 3 | 59.884mb +0.00% | 9.309ms -0.27% | ±0.62% -17.15% |
| RowsBench | bench_sort_on_1k | 2 | 3 | 57.609mb +0.00% | 37.583ms +0.48% | ±1.88% +2554.21% |
| RowsBench | bench_take_1k_on_10k | 10 | 3 | 57.610mb +0.00% | 21.000μs -1.30% | ±0.39% -56.36% |
| RowsBench | bench_take_right_1k_on_10k | 10 | 3 | 57.610mb +0.00% | 26.230μs +0.14% | ±0.89% +396.21% |
| RowsBench | bench_unique_on_1k | 2 | 3 | 80.451mb +0.00% | 178.852ms -1.06% | ±1.11% +33.45% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 91.718mb +0.01% | 146.632ms -0.97% | ±0.16% -72.76% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 47.599mb -0.02% | 74.443ms -0.66% | ±0.78% -19.34% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 12.391mb -0.04% | 17.717ms +0.73% | ±0.95% +237.42% |
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
|
| public function source(Extractor $extractor) : self; | ||
| public function setSource(Extractor $extractor) : self; | ||
|
|
||
| public function source() : Extractor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to expose the pipeline data source in order to be able to access it at the Optimizer level.
| use Flow\ETL\Pipeline\Pipes; | ||
|
|
||
| final class LogicalPlan | ||
| final class ExecutionPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically speaking, looking at the current implementation, it's more a Physical than a Logical plan, but for now, lets call it an Execution Plan which IMO it's a most self-descriptive name for what this class is really responsible for.
|
Technically speaking, it should be possible to move the current Execution Plan processing logic into Optimizer, however, for now I would like to keep both mechanisms. There is a very good chance that Execution Plan processor will evolve into |
Change Log
Added
Fixed
Changed
Removed
Deprecated
Security
Closes: #715
Description
I was first trying to extend ExecutionPlan processors, however achieving what was needed without access to the Pipeline itself was extremely complicated, I ended up adding Locks to Transformers in order to avoid removing them from the pipeline in certain situations.
After a few attempts, I started once again from scratch, this time focusing on optimizing the pipeline not before execution but when the element is supposed to be added to the pipeline.
That's how I ended up creating a new mechanism in the ETL called
Optimizer.The role of the optimizer is to analyze the existing pipeline/extractor and decide what's the most optimal way of adding a given element to the pipeline.
Currently, there is only one Optimization that can be applied,
LimitOptimizationbut we can easily expand this mechanism in order to:It's best to show it on example:
Without Optimizations total time of reading 10 rows from 1mln rows parquet file was something around 11 sec, after optimization is applied, that time drops below 1sec.