Skip to content

Conversation

@norberttech
Copy link
Member

@norberttech norberttech commented Nov 4, 2023

Change Log

Added

  • Pipeline Optimizer
  • LimitOptimization

Fixed

Changed

  • Renamed LogicalPlan into ExecutionPlan

Removed

Deprecated

Security


Closes: #715

Description

I was first trying to extend ExecutionPlan processors, however achieving what was needed without access to the Pipeline itself was extremely complicated, I ended up adding Locks to Transformers in order to avoid removing them from the pipeline in certain situations.
After a few attempts, I started once again from scratch, this time focusing on optimizing the pipeline not before execution but when the element is supposed to be added to the pipeline.

That's how I ended up creating a new mechanism in the ETL called Optimizer.
The role of the optimizer is to analyze the existing pipeline/extractor and decide what's the most optimal way of adding a given element to the pipeline.

Currently, there is only one Optimization that can be applied, LimitOptimization but we can easily expand this mechanism in order to:

  • reduce the number of transformers, merging the same transformations together (like keep/drop entries)
  • apply selects/filters directly at extractors (when possible)
  • remove redundant transformations

It's best to show it on example:

<?php

use Flow\ETL\DSL\Parquet;
use Flow\ETL\DSL\To;
use Flow\ETL\Flow;

include __DIR__ . '/../vendor/autoload.php';
include __DIR__ . '/../tools/blackfire/vendor/autoload.php';

$start = hrtime(true);
(new Flow())
    ->read(Parquet::from(__DIR__ . '/1_mln_rows.parquet',))
    ->limit(10)
    ->write(To::output())
    ->count();
$end = hrtime(true);
echo 'Time: ' . ($end - $start) / 1e+9 . 's' . PHP_EOL;

Without Optimizations total time of reading 10 rows from 1mln rows parquet file was something around 11 sec, after optimization is applied, that time drops below 1sec.

@norberttech norberttech force-pushed the feature/early-detect-limit branch from d313559 to 11aa9c0 Compare November 4, 2023 20:41
@github-actions
Copy link
Contributor

github-actions bot commented Nov 4, 2023

Flow PHP - Benchmarks

Results of the benchmarks from this PR are compared with the results from 1.x branch.

Extractors
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
| benchmark             | subject           | revs | its | mem_peak         | mode             | rstdev           |
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
| AvroExtractorBench    | bench_extract_10k | 1    | 3   | 34.889mb +0.95%  | 440.442ms +1.75% | ±0.81% +245.55%  |
| CSVExtractorBench     | bench_extract_10k | 1    | 3   | 4.773mb +0.35%   | 347.942ms +0.72% | ±0.31% +95.15%   |
| JsonExtractorBench    | bench_extract_10k | 1    | 3   | 4.921mb +0.33%   | 694.870ms -0.76% | ±0.15% -50.43%   |
| ParquetExtractorBench | bench_extract_10k | 1    | 3   | 233.480mb +0.01% | 993.406ms -2.37% | ±0.36% +1121.74% |
| TextExtractorBench    | bench_extract_10k | 1    | 3   | 4.767mb +0.35%   | 23.161ms +1.06%  | ±0.31% -88.59%   |
| XmlExtractorBench     | bench_extract_10k | 1    | 3   | 4.767mb +0.35%   | 555.245ms -0.48% | ±0.15% -48.03%   |
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
Transformers
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
| benchmark                   | subject                  | revs | its | mem_peak        | mode            | rstdev        |
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
| RenameEntryTransformerBench | bench_transform_10k_rows | 1    | 3   | 87.034mb +0.02% | 66.939ms -2.76% | ±0.85% +7.18% |
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
Loaders
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
| benchmark          | subject        | revs | its | mem_peak         | mode             | rstdev          |
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
| AvroLoaderBench    | bench_load_10k | 1    | 3   | 93.274mb +0.02%  | 695.975ms -1.25% | ±2.51% +133.35% |
| CSVLoaderBench     | bench_load_10k | 1    | 3   | 46.050mb +0.04%  | 69.665ms +0.70%  | ±0.34% -70.21%  |
| JsonLoaderBench    | bench_load_10k | 1    | 3   | 88.552mb +0.02%  | 76.827ms -1.88%  | ±0.30% -78.24%  |
| ParquetLoaderBench | bench_load_10k | 1    | 3   | 283.992mb +0.01% | 1.542s +1.03%    | ±0.61% +140.28% |
| TextLoaderBench    | bench_load_10k | 1    | 3   | 16.544mb +0.10%  | 37.464ms +1.12%  | ±1.69% +238.31% |
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
Building Blocks
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
| benchmark               | subject                    | revs | its | mem_peak        | mode             | rstdev           |
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
| RowsBench               | bench_chunk_10_on_10k      | 2    | 3   | 60.658mb +0.00% | 4.136ms +1.70%   | ±2.42% +194.22%  |
| RowsBench               | bench_diff_left_1k_on_10k  | 2    | 3   | 80.450mb +0.00% | 180.197ms +0.73% | ±1.01% -34.97%   |
| RowsBench               | bench_diff_right_1k_on_10k | 2    | 3   | 58.976mb +0.00% | 17.874ms -0.39%  | ±0.22% -71.13%   |
| RowsBench               | bench_drop_1k_on_10k       | 2    | 3   | 59.797mb +0.00% | 2.871ms -2.14%   | ±1.27% +824.56%  |
| RowsBench               | bench_drop_right_1k_on_10k | 2    | 3   | 59.797mb +0.00% | 3.003ms +2.09%   | ±2.16% +540.12%  |
| RowsBench               | bench_entries_on_10k       | 2    | 3   | 59.010mb +0.00% | 3.975ms -6.86%   | ±2.45% -30.82%   |
| RowsBench               | bench_filter_on_10k        | 2    | 3   | 59.539mb +0.00% | 23.573ms -0.44%  | ±0.93% -48.02%   |
| RowsBench               | bench_find_on_10k          | 2    | 3   | 59.539mb +0.00% | 23.304ms -1.34%  | ±0.50% -37.69%   |
| RowsBench               | bench_find_one_on_10k      | 10   | 3   | 57.610mb +0.00% | 2.300μs -4.17%   | ±3.55% +4.35%    |
| RowsBench               | bench_first_on_10k         | 10   | 3   | 57.610mb +0.00% | 0.500μs 0.00%    | ±0.00% 0.00%     |
| RowsBench               | bench_flat_map_on_1k       | 2    | 3   | 65.843mb +0.00% | 13.634ms +0.79%  | ±0.96% -47.03%   |
| RowsBench               | bench_map_on_10k           | 2    | 3   | 91.363mb +0.00% | 60.981ms -3.16%  | ±0.28% -54.04%   |
| RowsBench               | bench_merge_1k_on_10k      | 2    | 3   | 60.060mb +0.00% | 3.272ms +4.51%   | ±3.24% +834.83%  |
| RowsBench               | bench_partition_by_on_10k  | 2    | 3   | 62.330mb +0.00% | 47.788ms -4.41%  | ±0.19% -82.97%   |
| RowsBench               | bench_remove_on_10k        | 2    | 3   | 62.160mb +0.00% | 7.675ms -1.75%   | ±0.70% -43.97%   |
| RowsBench               | bench_sort_asc_on_1k       | 2    | 3   | 57.610mb +0.00% | 50.515ms +0.43%  | ±0.80% -32.85%   |
| RowsBench               | bench_sort_by_on_1k        | 2    | 3   | 57.610mb +0.00% | 50.659ms +1.01%  | ±0.28% +336.47%  |
| RowsBench               | bench_sort_desc_on_1k      | 2    | 3   | 57.610mb +0.00% | 50.661ms +0.24%  | ±0.08% -83.89%   |
| RowsBench               | bench_sort_entries_on_1k   | 2    | 3   | 59.884mb +0.00% | 9.309ms -0.27%   | ±0.62% -17.15%   |
| RowsBench               | bench_sort_on_1k           | 2    | 3   | 57.609mb +0.00% | 37.583ms +0.48%  | ±1.88% +2554.21% |
| RowsBench               | bench_take_1k_on_10k       | 10   | 3   | 57.610mb +0.00% | 21.000μs -1.30%  | ±0.39% -56.36%   |
| RowsBench               | bench_take_right_1k_on_10k | 10   | 3   | 57.610mb +0.00% | 26.230μs +0.14%  | ±0.89% +396.21%  |
| RowsBench               | bench_unique_on_1k         | 2    | 3   | 80.451mb +0.00% | 178.852ms -1.06% | ±1.11% +33.45%   |
| NativeEntryFactoryBench | bench_entry_factory        | 1    | 3   | 91.718mb +0.01% | 146.632ms -0.97% | ±0.16% -72.76%   |
| NativeEntryFactoryBench | bench_entry_factory        | 1    | 3   | 47.599mb -0.02% | 74.443ms -0.66%  | ±0.78% -19.34%   |
| NativeEntryFactoryBench | bench_entry_factory        | 1    | 3   | 12.391mb -0.04% | 17.717ms +0.73%  | ±0.95% +237.42%  |
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+

@norberttech norberttech changed the title RM Pipeline Optimization - LimitOptimizer Nov 4, 2023
public function source(Extractor $extractor) : self;
public function setSource(Extractor $extractor) : self;

public function source() : Extractor;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to expose the pipeline data source in order to be able to access it at the Optimizer level.

use Flow\ETL\Pipeline\Pipes;

final class LogicalPlan
final class ExecutionPlan
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically speaking, looking at the current implementation, it's more a Physical than a Logical plan, but for now, lets call it an Execution Plan which IMO it's a most self-descriptive name for what this class is really responsible for.

@norberttech
Copy link
Member Author

Technically speaking, it should be possible to move the current Execution Plan processing logic into Optimizer, however, for now I would like to keep both mechanisms. There is a very good chance that Execution Plan processor will evolve into Executor, which I still think it's missing.
For example, now, asynchronous processing is possible through the LocalSocketPipeline (async) pipeline. Sync/Async could be moved to the Executor.

@norberttech norberttech merged commit 7e25a3e into flow-php:1.x Nov 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Apply limit directly to Extractors

1 participant