-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-9773: [C++] Implement Take kernel for ChunkedArray #13857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd welcome some design input on this. How should the output chunking be structured?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My knee jerk reaction is to prefer the existing chunk layouts, just in case the chunking was specially chosen for hardware or data placement reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, use the chunking of the take indices? I suppose that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intuition would be not to chunk at all. This is what we usually do for vector kernels when there is no natural mapping from input chunking to output chunking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the original motivation here, though, was for things that won't fit in a chunk (not necessarily due to number of rows, due to things like string data)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, though there's a more general performance concern (you don't want to concatenate the chunks of a large chunked array just to take 10 elements out of it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(also this comment is about primitive arrays)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm kind of liking the idea of using the take indices; potentially gives the user control of the chunking output of take.
Of course, for cases like string and binary data, we'll probably take a different approach and chunk based on what fits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed that for primitive arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This discussion seems like it might be relevant actually: https://issues.apache.org/jira/browse/ARROW-2532
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See
arrow/cpp/src/arrow/compute/kernels/vector_sort.cc
Lines 690 to 711 in 8474ee5
| // Preprocessed sort key. | |
| struct ResolvedSortKey { | |
| ResolvedSortKey(const std::shared_ptr<Array>& array, SortOrder order) | |
| : type(GetPhysicalType(array->type())), | |
| owned_array(GetPhysicalArray(*array, type)), | |
| array(*owned_array), | |
| order(order), | |
| null_count(array->null_count()) {} | |
| using LocationType = int64_t; | |
| template <typename ArrayType> | |
| ResolvedChunk<ArrayType> GetChunk(int64_t index) const { | |
| return {&checked_cast<const ArrayType&>(array), index}; | |
| } | |
| const std::shared_ptr<DataType> type; | |
| std::shared_ptr<Array> owned_array; | |
| const Array& array; | |
| SortOrder order; | |
| int64_t null_count; | |
| }; |
arrow/cpp/src/arrow/compute/kernels/vector_sort.cc
Lines 844 to 889 in 8474ee5
| // Preprocessed sort key. | |
| struct ResolvedSortKey { | |
| ResolvedSortKey(const std::shared_ptr<DataType>& type, ArrayVector chunks, | |
| SortOrder order, int64_t null_count) | |
| : type(GetPhysicalType(type)), | |
| owned_chunks(std::move(chunks)), | |
| chunks(GetArrayPointers(owned_chunks)), | |
| order(order), | |
| null_count(null_count) {} | |
| using LocationType = ::arrow::internal::ChunkLocation; | |
| template <typename ArrayType> | |
| ResolvedChunk<ArrayType> GetChunk(::arrow::internal::ChunkLocation loc) const { | |
| return {checked_cast<const ArrayType*>(chunks[loc.chunk_index]), | |
| loc.index_in_chunk}; | |
| } | |
| // Make a vector of ResolvedSortKeys for the sort keys and the given table. | |
| // `batches` must be a chunking of `table`. | |
| static Result<std::vector<ResolvedSortKey>> Make( | |
| const Table& table, const RecordBatchVector& batches, | |
| const std::vector<SortKey>& sort_keys) { | |
| auto factory = [&](const SortField& f) { | |
| const auto& type = table.schema()->field(f.field_index)->type(); | |
| // We must expose a homogenous chunking for all ResolvedSortKey, | |
| // so we can't simply pass `table.column(f.field_index)` | |
| ArrayVector chunks(batches.size()); | |
| std::transform(batches.begin(), batches.end(), chunks.begin(), | |
| [&](const std::shared_ptr<RecordBatch>& batch) { | |
| return batch->column(f.field_index); | |
| }); | |
| return ResolvedSortKey(type, std::move(chunks), f.order, | |
| table.column(f.field_index)->null_count()); | |
| }; | |
| return ::arrow::compute::internal::ResolveSortKeys<ResolvedSortKey>( | |
| *table.schema(), sort_keys, factory); | |
| } | |
| std::shared_ptr<DataType> type; | |
| ArrayVector owned_chunks; | |
| std::vector<const Array*> chunks; | |
| SortOrder order; | |
| int64_t null_count; | |
| }; |
895e2da to
97cb201
Compare
|
@ursabot please benchmark lang=C++ |
|
Benchmark runs are scheduled for baseline = e63a13a and contender = a43fa07. Results will be available as each benchmark for each run completes. |
|
I started by working on the Take implementation for primitive values, so I could familiarize myself with how the Take kernels work. But based on the benchmark I added, it seems like I actually made performance much worse (except in the monotonic case)! I suspect this is because having to use Benchmark resultsBaseline: Proposed: |
|
@wjones127 The numbers seem a bit low to be honest, but perhaps that's just me. I haven't looked at the implementation. |
You are correct on that. Both too low in test and baseline, by about the same factor. I was creating too large of a chunked array for the indices. Benchmark resultsBaseline: Proposed: |
|
After some more testing it seems like concatenating buffers and indexing into that always wins over using From my quick test of Take on chunked string arrays:
Unless there is a more performant way, we might just only have Take kernels specialized for ChunkedArrays for String / Binary / List (and also Struct since it will then need to handle rechunking of child arrays.) @edponce I know you looked at String ChunkedArray Take Benchmark Codevoid BenchStringTest() {
// Create chunked string array
int32_t string_min_length = 0, string_max_length = 32;
const int64_t n_chunks = 10;
const int64_t array_size = args.size / n_chunks;
ArrayVector chunks;
for (int64_t i = 0; i < n_chunks; ++i) {
auto chunk = std::static_pointer_cast<StringArray>(
rand.String(args.size, string_min_length, string_max_length, 0));
chunks.push_back(chunk);
}
auto values = ChunkedArray(chunks);
// Create indices
auto indices =
rand.Int32(values.length(), 0, static_cast<int32_t>(values.length() - 1), 0);
for (auto _ : state) {
TypedBufferBuilder<int32_t> offset_builder;
TypedBufferBuilder<uint8_t> data_builder;
const int32_t* indices_values = indices->data()->GetValues<int32_t>(1);
if (concat_chunks) {
// Concat the chunks
ASSIGN_OR_ABORT(std::shared_ptr<Array> values_combined,
Concatenate(values.chunks()));
const uint8_t* values_data = values_combined->data()->GetValues<uint8_t>(1);
const int32_t* values_offsets = values_combined->data()->GetValues<int32_t>(2);
// for each value
for (int i = 0; i < indices->length(); ++i) {
int32_t index = indices_values[i];
// get the offset and size
int32_t offset = values_offsets[index];
int64_t length = values_offsets[index + 1] - offset;
// throw them on the builder
data_builder.UnsafeAppend(values_data + offset, length);
}
} else {
using arrow::internal::ChunkLocation;
using arrow::internal::ChunkResolver;
ChunkResolver resolver(values.chunks());
std::vector<const uint8_t*> values_data(values.num_chunks());
std::vector<const int32_t*> values_offsets(values.num_chunks());
for (int i = 0; i < values.num_chunks(); ++i) {
values_data[i] = values.chunks()[i]->data()->GetValues<uint8_t>(1);
values_offsets[i] = values.chunks()[i]->data()->GetValues<int32_t>(2);
}
// for each index
for (int i = 0; i < indices->length(); ++i) {
// Resolve the location
ChunkLocation location = resolver.Resolve(indices_values[i]);
// Get the offset and size
int32_t offset = values_offsets[location.chunk_index][location.index_in_chunk];
int32_t length =
values_offsets[location.chunk_index][location.index_in_chunk + 1] - offset;
// throw them on the builder
data_builder.UnsafeAppend(values_data[location.chunk_index] + offset, length);
}
}
}
} |
|
@wjones127 I think the benchmark test is not doing a fair comparison as there is no need on copying the data and offsets into the temporary std::vectors. The first loop is not necessary. Nevertheless, there are only 10 chunks, so I wouldn't expect a significant penalty from it but better measure than assume. Also, IIRC the binary search in |
|
Now, definitely a fixed array will be quicker to access (simple offset calculation) than a binary search across allocations that are not necessarily contiguous in hardware and may even reside in different OS pages. I'd be curious how the benchmark compares when using a large number of chunks: 10, 100, 1000 which is where the concatenation penalty is noticeable. Obviously, the sizes of the chunks also matter. |
Reran with a variety of chunk sizes and total array sizes. It seems like ChunkResolver is only better in the extreme case of extremely small chunks (chunk size of 1000 and size of 4,194 means each chunk has about 4 elements). |
|
@wjones127 These numbers are for the random or monotonic use case? |
|
@wjones127 Thanks for sharing these benchmarks. Are these results measured without the extra overhead of the temporary It is reasonable that the smaller the chunk size the better performance ChunkResolver cases are, due to chunk caching and the higher probability of hitting the same chunk consecutively. Performance is a tricky business bc it depends on the metrics you are evaluating for. Both approaches have advantage and disadvantage. If you have a large dataset and are somewhat memory constrained, the concatenation approach may not be adequate due to the extra storage. The ChunkResolver is the most general solution with least overhead on memory use and still reasonable performance. AFAIK, Arrow does not tracks memory statistics to permit selecting which of these approaches should be used. Well, maybe adding an option for the client code to decide but this does not seem to follow Arrow's general design. |
That's for random. Here is it including monotonic, which makes it more complex: So it seems like it's better in the monotonic case, but worse in the random case. My benchmark code is available here: https://github.com/wjones127/arrow/blob/370d0870c68627224aedcfb79cfd7ceb7d0dfa99/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc#L206-L277
I hadn't removed it. Removed it in the test that I'm showing results for above.
In some cases it seems like it would be a serious regression; so I'm trying to figure out which cases those are if we can avoid using ChunkResolver in those cases. It's hard to say if that extra memory usage is that significant. I feel like some extra memory usage will always happen within compute function. This is large since it needs to operate on the entire chunk, rather than just a chunk at a time. But also with memory pools we can rapidly reuse memory; so I imagine for example if we are running |
|
From the results above, before performing the Take operation what information do we know that could allow us to select the adequate strategy?
Now let's try to very hand-wavy summarize some observations based on logical array size. Random order
Monotonic order
Based on this a general decision rule could be, if indices are random or array size and number of chunks is not that large, use concat, otherwise ChunkResolver. But if we can't identify access order, then it does looks like concat would be a better choice assuming most use cases of Take make random accesses. An alternative approach could be to add a configuration flag in Arrow that states "optimize for speed or optimize for storage", and this could be used to select strategies all throughout. |
| if (values.IsValid(indices_data[position])) { | ||
| // value is not null | ||
| out[position] = values_data[indices_data[position]]; | ||
| out[position] = values.GetValue(indices_data[position]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll note that in the IsValid -> GetValue sequence the chunked resolution is called twice. The compiler might be able to optimize away the second call but that's not guaranteed.
…instead of TakeCA (#40206) ### Rationale for this change `take` concatenates chunks when it's applied to a chunked `values` array, but when the `indices` arrays is also `chunked` it concatenates `values` more than once -- one `Concatenate` call with `values.chunks()` for every chunk in `indices`. This PR doesn't remove the concatenation, but ensures it's done only once instead of `indices.size()` times. ### What changes are included in this PR? - Adding return type to the `TakeXX` names (-> `TakeXXY`) to makes code easier to understand - Adding benchmarks to `TakeCCC` — copied from #13857 - Remove the concatenation from the loop body (!) ### Are these changes tested? By existing tests. ### Are there any user-facing changes? A faster compute kernel. * GitHub Issue: #40207 Authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>


No description provided.