-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13532: [C++][Compute] - adding set membership type filtering to hash table interface #10858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-13532: [C++][Compute] - adding set membership type filtering to hash table interface #10858
Conversation
f03890c to
7d21f80
Compare
7d21f80 to
b47a8b7
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, just a few nits:
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing I'd expect to see here is a new method on Grouper for looking up IDs from an ExecBatch, something like Result<Datum> Grouper::Find(const ExecBatch&) const;. If you'd prefer to do that in a follow up, please open a JIRA for adding that method
I created ARROW-13706. |
|
@michalursa needs a rebase |
|
@michalursa is there an update on this? #10845 depends on this and it will be great if this can be merged soon 🙂 |
Grouper filtering - code review requested changes
e33863c to
392a84d
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. CI failure seems to be a timeout. Will merge
… hash table interface This change contains some refactoring of the code of the core hash table implementation used in grouper. The goal of this change is to separate read-only operations on the hash table from operations modifying it. Originally the only provided operation for hash table access was the map operation, that would return ids of matching keys found in the hash table or automatically insert new keys if they are not found assigning them new ids. The change splits the map operation into a pipeline consisting of three stages: - early filter - find - map_new_keys The three of them called one after another provide the functionality of the map. The output of each of them is used as the input to the next one. Each next stage in the pipeline can potentially process a smaller subset of rows than the previous stage, because of filtering of the rows done at each stage. Early filter corresponds to the part that can be seen as an equivalent of Bloom filter. It quickly, based on hash values only and without executing any key comparison, marks the keys that definitely do not have a match in the hash table. False positives are possible, but as with the Bloom filter, their probability should be small. The next stage in the pipeline, find method, correspond to the full processing of all of the input rows with keys that are already present in the hash table. It is a read-only operation on the hash table. It finishes filtering from early filter getting rid of any potential false positives. It also outputs corresponding group ids for all keys found in the hash table. The caller may ignore group ids if only the filtering part is important, but there is no meaningful performance overhead in outputting them, since they are needed anyway for executing key comparisons. The final stage of the pipeline is completing the pre-existing map functionality, processing all new keys from the last batch. The set of new keys is identified by the result of the previous stage - filter operation. The last stage takes care of inserting new keys, assigning them new group ids, resizing the hash table when it gets too full. The number of inserted keys may be smaller than the number of keys passed to this stage, since there may be duplicates among them. The restructuring of the code should not only be useful for the implementation of join exec node, but it should also help in the future in implementation of shared multi-threaded access. Only the last stage of the pipeline can modify the hash table, so it is the only one that requires thread synchronization. At the same time it only processes the keys that were not present in the hash table when the processing of the exec batch started, so it can be expected in many cases to be a small fraction of all the inputs. There is no performance difference in my run of arrow-compute-aggregate-benchmark (on Ubuntu, clang 10.0, Intel I9-10980XE) both for AVX2 build and a build without SIMD. Closes apache#10858 from michalursa/ARROW-13532-filter-interface-for-grouper Lead-authored-by: michalursa <michal@ursacomputing.com> Co-authored-by: michalursa <76971930+michalursa@users.noreply.github.com> Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
This change contains some refactoring of the code of the core hash table implementation used in grouper.
The goal of this change is to separate read-only operations on the hash table from operations modifying it.
Originally the only provided operation for hash table access was the map operation, that would return ids of matching keys found in the hash table or automatically insert new keys if they are not found assigning them new ids.
The change splits the map operation into a pipeline consisting of three stages:
The three of them called one after another provide the functionality of the map. The output of each of them is used as the input to the next one. Each next stage in the pipeline can potentially process a smaller subset of rows than the previous stage, because of filtering of the rows done at each stage.
Early filter corresponds to the part that can be seen as an equivalent of Bloom filter. It quickly, based on hash values only and without executing any key comparison, marks the keys that definitely do not have a match in the hash table. False positives are possible, but as with the Bloom filter, their probability should be small.
The next stage in the pipeline, find method, correspond to the full processing of all of the input rows with keys that are already present in the hash table. It is a read-only operation on the hash table. It finishes filtering from early filter getting rid of any potential false positives. It also outputs corresponding group ids for all keys found in the hash table. The caller may ignore group ids if only the filtering part is important, but there is no meaningful performance overhead in outputting them, since they are needed anyway for executing key comparisons.
The final stage of the pipeline is completing the pre-existing map functionality, processing all new keys from the last batch. The set of new keys is identified by the result of the previous stage - filter operation. The last stage takes care of inserting new keys, assigning them new group ids, resizing the hash table when it gets too full. The number of inserted keys may be smaller than the number of keys passed to this stage, since there may be duplicates among them.
The restructuring of the code should not only be useful for the implementation of join exec node, but it should also help in the future in implementation of shared multi-threaded access. Only the last stage of the pipeline can modify the hash table, so it is the only one that requires thread synchronization. At the same time it only processes the keys that were not present in the hash table when the processing of the exec batch started, so it can be expected in many cases to be a small fraction of all the inputs.
There is no performance difference in my run of arrow-compute-aggregate-benchmark (on Ubuntu, clang 10.0, Intel I9-10980XE) both for AVX2 build and a build without SIMD.