Skip to content

Parallel processing right after reading FROM file()#48525

Merged
devcrafter merged 15 commits intomasterfrom
parallel-reading-from-file
Apr 10, 2023
Merged

Parallel processing right after reading FROM file()#48525
devcrafter merged 15 commits intomasterfrom
parallel-reading-from-file

Conversation

@devcrafter
Copy link
Copy Markdown
Member

@devcrafter devcrafter commented Apr 6, 2023

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Query processing is parallelized right after reading FROM file(...). Related to #38755

@devcrafter
Copy link
Copy Markdown
Member Author

devcrafter commented Apr 6, 2023

Before:

EXPLAIN PIPELINE
SELECT sum(length(base58Encode(URL)))
FROM file('hits_*.parquet')

Query id: daaea006-eee5-4561-b293-2e07a224dd9d

┌─explain──────────────────────────┐
│ (Expression)                     │
│ ExpressionTransform × 18         │
│   (Aggregating)                  │
│   Resize 1 → 18                  │
│     AggregatingTransform         │
│       (Expression)               │
│       ExpressionTransform        │
│         (ReadFromPreparedSource) │
│         NullSource 0 → 1         │
└──────────────────────────────────┘

SELECT sum(length(base58Encode(URL)))
FROM file('hits.parquet')

Query id: 960c63d0-7e11-4c5b-8d47-4a5800cf6429

┌─sum(length(base58Encode(URL)))─┐
│                      942426048 │
└────────────────────────────────┘

1 row in set. Elapsed: 124.852 sec. Processed 8.87 million rows, 776.83 MB (71.08 thousand rows/s., 6.22 MB/s.)

After:

EXPLAIN PIPELINE
SELECT sum(length(base58Encode(URL)))
FROM file('hits.parquet')

┌─explain──────────────────────────┐
│ (Expression)                     │
│ ExpressionTransform × 18         │
│   (Aggregating)                  │
│   Resize 18 → 18                 │
│     AggregatingTransform × 18    │
│       StrictResize 18 → 18       │
│         (Expression)             │
│         ExpressionTransform × 18 │
│           (ReadFromStorage)      │
│           Resize 1 → 18          │
│             File 0 → 1           │
└──────────────────────────────────┘

SELECT sum(length(base58Encode(URL)))
FROM file('hits.parquet')

Query id: 63fd4df3-a180-4961-be32-244e231db9c9

┌─sum(length(base58Encode(URL)))─┐
│                      942426048 │
└────────────────────────────────┘

1 row in set. Elapsed: 7.917 sec. Processed 8.87 million rows, 776.83 MB (1.12 million rows/s., 98.12 MB/s.)

@devcrafter devcrafter added the pr-performance Pull request with some performance improvements label Apr 6, 2023
@alexey-milovidov
Copy link
Copy Markdown
Member

Let's also add a performance test (tests/performance).

@alexey-milovidov alexey-milovidov self-assigned this Apr 7, 2023
@devcrafter
Copy link
Copy Markdown
Member Author

devcrafter commented Apr 8, 2023

Around 50 tests potentially can become flaky due to this change.

Tests
rg --text -e 'from.*file\(' . | rg select | rg -v insert | rg -v Exception | rg -v serverError | rg -v max_threads=1 | cut -d ':' -f1 | cut -d'/' -f2- | uniq | sort
  • 0_stateless/01545_url_file_format_settings.sql
  • 0_stateless/01825_type_json_multiple_files.sh
  • 0_stateless/01946_test_zstd_decompression_with_escape_sequence_at_the_end_of_buffer.sh
  • 0_stateless/02051_symlinks_to_user_files.sh
  • 0_stateless/02105_table_function_file_partiotion_by.sh
  • 0_stateless/02130_parse_quoted_null.sh
  • 0_stateless/02149_schema_inference_formats_with_schema.sh
  • 0_stateless/02149_schema_inference.sh
  • 0_stateless/02167_format_from_file_extension.sh
  • 0_stateless/02185_orc_corrupted_file.sh
  • 0_stateless/02187_msg_pack_uuid.sh
  • 0_stateless/02211_jsonl_format_extension.sql
  • 0_stateless/02240_tskv_schema_inference_bug.sh
  • 0_stateless/02242_arrow_orc_parquet_nullable_schema_inference.sh
  • 0_stateless/02245_parquet_skip_unknown_type.sh
  • 0_stateless/02246_tsv_csv_best_effort_schema_inference.sh
  • 0_stateless/02247_names_order_in_json_and_tskv.sh
  • 0_stateless/02247_read_bools_as_numbers_json.sh
  • 0_stateless/02267_file_globs_schema_inference.sh
  • 0_stateless/02270_errors_in_files.sh
  • 0_stateless/02286_mysql_dump_input_format.sh
  • 0_stateless/02293_arrow_dictionary_indexes.sql
  • 0_stateless/02293_formats_json_columns.sh
  • 0_stateless/02302_defaults_in_columnar_formats.sql
  • 0_stateless/02313_avro_records_and_maps.sql
  • 0_stateless/02314_avro_null_as_default.sh
  • 0_stateless/02314_csv_tsv_skip_first_lines.sql
  • 0_stateless/02323_null_modifier_in_table_function.sql
  • 0_stateless/02373_heap_buffer_overflow_in_avro.sh
  • 0_stateless/02376_arrow_dict_with_string.sql
  • 0_stateless/02383_arrow_dict_special_cases.sh
  • 0_stateless/02384_nullable_low_cardinality_as_dict_in_arrow.sql
  • 0_stateless/02405_avro_read_nested.sql
  • 0_stateless/02416_input_json_formats.sql
  • 0_stateless/02417_json_object_each_row_format.sql
  • 0_stateless/02421_json_decimals_as_strings.sql
  • 0_stateless/02421_record_errors_row_by_input_format.sh
  • 0_stateless/02454_json_object_each_row_column_for_object_name.sql
  • 0_stateless/02455_one_row_from_csv_memory_usage.sh
  • 0_stateless/02475_bson_each_row_format.sh
  • 0_stateless/02482_capnp_list_of_structs.sh
  • 0_stateless/02482_json_nested_arrays_with_same_keys.sh
  • 0_stateless/02483_capnp_decimals.sh
  • 0_stateless/02500_bson_read_object_id.sh
  • 0_stateless/02511_parquet_orc_missing_columns.sh
  • 0_stateless/02521_avro_union_null_nested.sh
  • 0_stateless/02522_avro_complicate_schema.sh
  • 0_stateless/02541_arrow_duration_type.sh
  • 0_stateless/02566_ipv4_ipv6_binary_formats.sh
  • 0_stateless/02588_parquet_bug.sh
  • 0_stateless/02705_protobuf_debug_abort.sh

Will check them. Probably a rule of thumb to change a test:

  • if data most likely fit one block - do nothing
  • if schema is not provided in file() apply max_threads=1, otherwise use order by for all columns

@devcrafter devcrafter merged commit e3b5072 into master Apr 10, 2023
@devcrafter devcrafter deleted the parallel-reading-from-file branch April 10, 2023 16:30
@Avogar
Copy link
Copy Markdown
Member

Avogar commented Apr 12, 2023

Should we do the same for s3/url/hdfs table functions?
Also for s3 table function I see using narrowPipes:

narrowPipe(pipe, num_streams);

Is it smth similar? I don't complitely understand what this function does.

@devcrafter
Copy link
Copy Markdown
Member Author

devcrafter commented Apr 12, 2023

Should we do the same for s3/url/hdfs table functions? Also for s3 table function I see using narrowPipes:

narrowPipe(pipe, num_streams);

Is it smth similar? I don't complitely understand what this function does.

Hope we'll see it in #48727

UPD: it's redundant since we already create a number of sources by number of streams. See d5eb65b

@alexey-milovidov
Copy link
Copy Markdown
Member

It should be marked as a backward incompatible change, just in case.
For example, it broke ClickHouse/NoiSQL#4 slightly.

@devcrafter
Copy link
Copy Markdown
Member Author

It should be marked as a backward incompatible change, just in case. For example, it broke ClickHouse/NoiSQL#4 slightly.

It sounds somewhat controversial. The query on top of file() can be parallelized in other places in the query pipeline. Will be such changes backward incompatible?

@alexey-milovidov
Copy link
Copy Markdown
Member

It's just for upgrade notes. We rarely have backward incompatible changes, but something that could break a strange use case will be highlighted in the changelog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backward compatibility pr-performance Pull request with some performance improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants