Skip to content

Conversation

@michalursa
Copy link
Contributor

@michalursa michalursa commented Aug 3, 2021

This change contains some refactoring of the code of the core hash table implementation used in grouper.
The goal of this change is to separate read-only operations on the hash table from operations modifying it.

Originally the only provided operation for hash table access was the map operation, that would return ids of matching keys found in the hash table or automatically insert new keys if they are not found assigning them new ids.

The change splits the map operation into a pipeline consisting of three stages:

  • early filter
  • find
  • map_new_keys
    The three of them called one after another provide the functionality of the map. The output of each of them is used as the input to the next one. Each next stage in the pipeline can potentially process a smaller subset of rows than the previous stage, because of filtering of the rows done at each stage.

Early filter corresponds to the part that can be seen as an equivalent of Bloom filter. It quickly, based on hash values only and without executing any key comparison, marks the keys that definitely do not have a match in the hash table. False positives are possible, but as with the Bloom filter, their probability should be small.

The next stage in the pipeline, find method, correspond to the full processing of all of the input rows with keys that are already present in the hash table. It is a read-only operation on the hash table. It finishes filtering from early filter getting rid of any potential false positives. It also outputs corresponding group ids for all keys found in the hash table. The caller may ignore group ids if only the filtering part is important, but there is no meaningful performance overhead in outputting them, since they are needed anyway for executing key comparisons.

The final stage of the pipeline is completing the pre-existing map functionality, processing all new keys from the last batch. The set of new keys is identified by the result of the previous stage - filter operation. The last stage takes care of inserting new keys, assigning them new group ids, resizing the hash table when it gets too full. The number of inserted keys may be smaller than the number of keys passed to this stage, since there may be duplicates among them.

The restructuring of the code should not only be useful for the implementation of join exec node, but it should also help in the future in implementation of shared multi-threaded access. Only the last stage of the pipeline can modify the hash table, so it is the only one that requires thread synchronization. At the same time it only processes the keys that were not present in the hash table when the processing of the exec batch started, so it can be expected in many cases to be a small fraction of all the inputs.

There is no performance difference in my run of arrow-compute-aggregate-benchmark (on Ubuntu, clang 10.0, Intel I9-10980XE) both for AVX2 build and a build without SIMD.

@github-actions
Copy link

github-actions bot commented Aug 3, 2021

@michalursa michalursa force-pushed the ARROW-13532-filter-interface-for-grouper branch 2 times, most recently from f03890c to 7d21f80 Compare August 9, 2021 06:44
@michalursa michalursa force-pushed the ARROW-13532-filter-interface-for-grouper branch from 7d21f80 to b47a8b7 Compare August 9, 2021 07:51
@ianmcook ianmcook marked this pull request as ready for review August 17, 2021 21:05
@ianmcook ianmcook requested a review from bkietz August 17, 2021 21:05
Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, just a few nits:

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thing I'd expect to see here is a new method on Grouper for looking up IDs from an ExecBatch, something like Result<Datum> Grouper::Find(const ExecBatch&) const;. If you'd prefer to do that in a follow up, please open a JIRA for adding that method

@michalursa
Copy link
Contributor Author

The other thing I'd expect to see here is a new method on Grouper for looking up IDs from an ExecBatch, something like Result<Datum> Grouper::Find(const ExecBatch&) const;. If you'd prefer to do that in a follow up, please open a JIRA for adding that method

I created ARROW-13706.

@bkietz
Copy link
Member

bkietz commented Aug 30, 2021

@michalursa needs a rebase

@nirandaperera
Copy link
Contributor

nirandaperera commented Sep 1, 2021

@michalursa is there an update on this? #10845 depends on this and it will be great if this can be merged soon 🙂

Grouper filtering - code review requested changes
@bkietz bkietz force-pushed the ARROW-13532-filter-interface-for-grouper branch from e33863c to 392a84d Compare September 21, 2021 20:24
Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. CI failure seems to be a timeout. Will merge

@bkietz bkietz closed this in 18ca1e9 Sep 22, 2021
ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
… hash table interface

This change contains some refactoring of the code of the core hash table implementation used in grouper.
The goal of this change is to separate read-only operations on the hash table from operations modifying it.

Originally the only provided operation for hash table access was the map operation, that would return ids of matching keys found in the hash table or automatically insert new keys if they are not found assigning them new ids.

The change splits the map operation into a pipeline consisting of three stages:
- early filter
- find
- map_new_keys
The three of them called one after another provide the functionality of the map. The output of each of them is used as the input to the next one. Each next stage in the pipeline can potentially process a smaller subset of rows than the previous stage, because of filtering of the rows done at each stage.

Early filter corresponds to the part that can be seen as an equivalent of Bloom filter. It quickly, based on hash values only and without executing any key comparison, marks the keys that definitely do not have a match in the hash table. False positives are possible, but as with the Bloom filter, their probability should be small.

The next stage in the pipeline, find method, correspond to the full processing of all of the input rows with keys that are already present in the hash table. It is a read-only operation on the hash table. It finishes filtering from early filter getting rid of any potential false positives. It also outputs corresponding group ids for all keys found in the hash table. The caller may ignore group ids if only the filtering part is important, but there is no meaningful performance overhead in outputting them, since they are needed anyway for executing key comparisons.

The final stage of the pipeline is completing the pre-existing map functionality, processing all new keys from the last batch. The set of new keys is identified by the result of the previous stage - filter operation. The last stage takes care of inserting new keys, assigning them new group ids, resizing the hash table when it gets too full. The number of inserted keys may be smaller than the number of keys passed to this stage, since there may be duplicates among them.

The restructuring of the code should not only be useful for the implementation of join exec node, but it should also help in the future in implementation of shared multi-threaded access. Only the last stage of the pipeline can modify the hash table, so it is the only one that requires thread synchronization. At the same time it only processes the keys that were not present in the hash table when the processing of the exec batch started, so it can be expected in many cases to be a small fraction of all the inputs.

 There is no performance difference in my run of arrow-compute-aggregate-benchmark (on Ubuntu, clang 10.0, Intel I9-10980XE) both for AVX2 build and a build without SIMD.

Closes apache#10858 from michalursa/ARROW-13532-filter-interface-for-grouper

Lead-authored-by: michalursa <michal@ursacomputing.com>
Co-authored-by: michalursa <76971930+michalursa@users.noreply.github.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants