-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Enable parsing columns from file path for Broker Load (#1582) #1635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
gensrc/thrift/PlanNodes.thrift
Outdated
| // total size of the file | ||
| 8: optional i64 file_size | ||
| // columns parsed from file path | ||
| 9: optional list<string> columns_from_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to record the slot offset for columns_from_path or name it num_of_columns_from_file. Because we may add other columns_from_xxx, we should make it definite.
And you should comment that columns_from_path is after the columns read from file.
be/src/exec/broker_scanner.cpp
Outdated
| str_slot->len = value.size; | ||
| } | ||
|
|
||
| inline void BrokerScanner::fill_slots_of_columns_from_path(int start, const std::vector<SlotDescriptor*>& src_slot_descs, Tuple* tuple) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't put this function to BaseScanner to avoid write this twice.
And if you have num_columns_from_file, this function don't need start param.
be/src/exec/parquet_reader.cpp
Outdated
| } else { | ||
| time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) * 24 * 60 * 60); | ||
| tm* local; | ||
| local = localtime(×tamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use localtime_r which is thread-safe
03e6231 to
30e3f0a
Compare
2729227 to
abc553c
Compare
| _parquet_column_ids.clear(); | ||
| for (auto slot_desc : tuple_slot_descs) { | ||
| for (int i = 0; i < _num_of_columns_from_file; i++) { | ||
| auto slot_desc = tuple_slot_descs.at(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto slot_desc = tuple_slot_descs.at(i); | |
| auto slot_desc = tuple_slot_descs[i]; |
| } | ||
| RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); | ||
| // range of current file | ||
| const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); | |
| const TBrokerRangeDesc& range = _ranges[_next_range - 1]; |
| } | ||
| } | ||
| // columnsFromPath | ||
| if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no need to persist columnsFromPath. Because Load have already persist all Load statement, and will generate DataDescriptor when restart.
| if (dataDescription.getColumnNames() != null) { | ||
| assignColumnNames.addAll(dataDescription.getColumnNames()); | ||
| } | ||
| if (dataDescription.getColumnsFromPath() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user specify columnFromPath without columnList,
I think we should return user's error message in LoadStatement analyze function.
For here, we should check dataDescription.getColumnsFromPath() in if (dataDescription.getColumnNames() != null) block
| fail(); | ||
| } | ||
|
|
||
| path = "/path/to/dir/k2==v2=//k1=v1//xxx.csv"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think should add directory test case like '/path/to/dir/k2==v2=//k1=v1/' which should return false
be/src/exec/base_scanner.cpp
Outdated
| } | ||
|
|
||
| void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path) { | ||
| if (start <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check is useless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check is useless
But we should skip the case of StreamLoadTask
c12e485 to
067b4c5
Compare
imay
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…apache#1656) Author: platoneko <platonekosama@gmail.com> Date: Wed Apr 12 12:24:15 2023 +0800 Use snapshot read to get tablet stats Author: plat1ko <platonekosama@gmail.com> Date: Tue Apr 11 16:02:51 2023 +0800 [selectdb-cloud] Fix incorrect tablet stats in finish tablet job (apache#1635) Author: plat1ko <platonekosama@gmail.com> Date: Fri Apr 7 23:13:06 2023 +0800 [feature](selectdb-cloud) Split tablet stats kv to reduce transaction conflicts (apache#1585) * Split tablet stats to reduce transaction conflicts * Fix mem txn kv and add ut * Add ut for atomic
Currently, we do not support parsing encoded/compressed columns in file path, eg: extract column k1 from file path /path/to/dir/k1=1/xxx.csv
This patch is able to parse columns from file path like in Spark(Partition Discovery).
This patch parse partition columns at BrokerScanNode.java and save parsing result of each file path as a property of TBrokerRangeDesc, then the broker reader of BE can read the value of specified partition column.
(I'm sorry to create a new pr about this issue for being not familiar with
git rebase)