Skip to content

Apply limit directly to Extractors #715

@norberttech

Description

@norberttech

The easiest way to explain it is through example:

(new Flow())
    ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv'))
    ->limit(100)
    ->write(\Flow\ETL\DSL\To::output())
    ->run();

In this case, we want to read only 100 from X rows available in the file.
Since the limit is now implemented as a pipeline (LimitingPipeline) extractor will still read at least 1000 rows since this is the default value for $rows_in_batch.
We can change this value to 100 however, it requires from the user some understanding of how Extractors are working, which I think is completely unnecessary.

Instead, limit should be passed directly to the Extractor (we can implement another interface, LimitedExtractor) for example which would let "something" to set that limit so the extractor regardless how it's configured, will not read more rows then in a given limit.

In theory that's the perfect task for LogicalPlan and Execution Processor but there is a small catch here.
In the current implementation, LogicalPlan has only access to Pipes (Transformers/Loaders). It does not have access to the Piepline itself, which means it has no access to LimitingPipeline.

What we need is something between DataFrame and Pipeline, that something is Executor (similar concept exists in Apache Spark).

The role of the Executor would be to create a LogicalPlan, process it using Execution Processor, and execute the pipeline.
Thanks to that, we are going to get direct access to Pipeline (which might hold other pipelines under itself), which would give us more flexible way of analyzing the entire execution chain and adjusting it (optimizing/validating/changing) before it gets executed.

Now going back to applying the limit on the Extractor.

So Processor should check if there are any transformations in the pipeline before DataFrame::limit() is used, whenever it wont find anything that might change the number of rows (like for example, expand), it should apply the limit directly on the Executor removing LimitingPipeline. Let's look at some examples:

(new Flow())
    ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv'))
    ->select("id", "name")
    ->limit(100)
    ->write(\Flow\ETL\DSL\To::output())
    ->run();

In this example, there is nothing before the limit that might change the number of results, limit should be applied directly on the Extractor.

(new Flow())
    ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv'))
    ->withEntry("item", ref("items")->expand()) 
    ->limit(100)
    ->write(\Flow\ETL\DSL\To::output())
    ->run();

In this case, we are expanding one row into multiple rows, so we can't properly predict on the Extractor level what the total number of rows after transformation we are going to get, we should stick to LimitingPipeline.

It's a small change for the users, but it comes with a huge DX impact, especially when dealing with large files.
Once this is done, we can implement similar mechanism for DataFrame::select, there are extractors like for example Parquet or Avro that can read only specific columns.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions