colexec: fix spilling of unordered distinct#58006
colexec: fix spilling of unordered distinct#58006craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
asubiotto
left a comment
There was a problem hiding this comment.
Instead of using an after-the-fact filtering approach, can we push it one layer down and override the in-memory hash table with the existing one?
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/unordered_distinct.go, line 32 at r1 (raw file):
distinctCols []uint32, typs []*types.T, memoryLimit int64,
I think this needs a more extensive explanation of why this is needed. I wonder if instead of having the operator need to track a memory limit (it's a very unfortunate leak of abstraction that in-memory operators need to worry about memory usage), we should have some way to detect and fix hashtable inconsistency in ExportBuffered (i.e. it would act as a trigger to fix the hash table if broken).
There's also the option to lean more into the "override" hash table idea I mentioned above. We'd make sure to fix the hash table in the external operator before passing it to an operator with an unlimited monitor.
82811cf to
7fdd016
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
I have considered such approach and discarded it for a couple of reasons:
- the current infrastructure of
ExportBufferedmethod provides us only with a way to emit the full input tuples that the disk-backed operator needs to process. However, the hash table stores only the columns needed for distinctness check of already emitted tuples, so we can use that channel to export it. We would need to create a "side channel" for exporting the in-memory state, and this channel currently would only be used by the external distinct (in the future, it could be used by the external hash aggregator if we are able to export the buckets with intermediate results). - assuming we have a way to export the hash table, we would need to override
resetmethod of the unordered distinct reused by the external distinct to make sure it keeps the tuples that came from the in-memory unordered distinct (which is a separate object). This seems like a broken abstraction to me. One way to go around it would be to inject the hash table state right after everyresetting of the unordered distinct used by the external distinct (that would probably be slightly inefficient but wouldn't matter much).
To me, planning a pre-filter seems a lot cleaner given the current spilling infrastructure since it is hidden in the constructor of the external distinct. Additionally, the current filtering approach reduces the number of tuples we have to partition and spill to disk which is a nice to have.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)
pkg/sql/colexec/unordered_distinct.go, line 32 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think this needs a more extensive explanation of why this is needed. I wonder if instead of having the operator need to track a memory limit (it's a very unfortunate leak of abstraction that in-memory operators need to worry about memory usage), we should have some way to detect and fix hashtable inconsistency in
ExportBuffered(i.e. it would act as a trigger to fix the hash table if broken).There's also the option to lean more into the "override" hash table idea I mentioned above. We'd make sure to fix the hash table in the external operator before passing it to an operator with an unlimited monitor.
I prototyped the approach with fixing the hash table, and I think I like it overall more. PTAL.
The complication here is that we have to be careful to not filter out tuples of the batch on which the OOM error occurred - they have been appended to the hash table but not yet emitted into the output, so I introduced a marker type for that.
Overall, I don't think we can fix this issue without some type of customization for the external distinct case due to the streaming nature of the unordered distinct - i.e. it is no longer simply a "buffering" operator, so I think breaking the abstraction is necessary here.
I responded to the "overriding hash table" idea above.
434f7e8 to
9745dd5
Compare
|
Sorry I lost track of this. Is this RFAL? |
|
Yes, it is RFAL (I kept the changes in a separate commit temporarily, for the ease of comparing two approaches). There is something weird with a unit test going on (I remember I couldn't reproduce the problems), maybe I'll just relax it a bit to not require a specific number of spills. |
asubiotto
left a comment
There was a problem hiding this comment.
Reviewed 6 of 7 files at r1, 1 of 1 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/unordered_distinct.go, line 81 at r2 (raw file):
// We adjust the allocator to request the amount of memory that should // exceed the limit of the bound memory account, so this call will // result in an internal OOM error that will be caught by the disk
Hmm, why not just panic with an actual OOM? The current approach seems hacky
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)
pkg/sql/colexec/unordered_distinct.go, line 81 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Hmm, why not just panic with an actual OOM? The current approach seems hacky
Let me describe the current peculiar situation in a lot of details.
Suppose the in-memory unordered distinct has already read 3 batches A, B, C, appended the distinct tuples from all of them into the hash table and emitted them. Now it reads batch D which has some new distinct tuples, but during the appending of these new distinct tuples the hash table reaches its memory limit.
Here is what happens in distinctBuild:
- batch
Dis updated in-place to remove the duplicate tuples within itself (this doesn't interact with tuples fromA,B, andCthat are in the hash table) - batch
Dis updated in-place again to remove the "global" duplicates. NowD's selection vector only contains tuples that are "globally" distinct which should be emitted into the output - however, before we can emit them into the output, we need to update the hash table to include these tuples, and this is where we hit an OOM which could occur for two reasons in
appendAllDistinct
3.1) because of appending the tuples toht.vals
3.2) because of reallocating internal slices of the hash table (buildFromBufferedTuples)
If the former is the case, then the hash table ends up in an inconsistent state, and we'll need to repair it before using. - regardless of when exactly an OOM error happened, a panic has occurred and we exited the execution of the in-memory unordered distinct. And what we have is the following: batch
Dcontains distinct tuples that need to be emitted into the output and those tuples have been appended to the hash table.
Now the question arises on how to handle the fallback to the external distinct, and the fallback is subject to the following requirements:
- all tuples of updated in-place batch
Dneed to be emitted somehow, without being filtered by the hash table since the filtering of that batch has already occurred - all tuples in batch
Eand all consequent ones have to be checked against already emitted tuples from batchesA,B,C, andD(assumingDis somehow emitted).
The second requirement can be satisfied in a relatively clean way by unorderedDistinctFilterer planned on top of the input to the disk-backed distinct; however, the first requirement is very tricky to get right. We can emit updated batch D in two places - either in the in-memory operator before panicking (this was my original approach) or in the disk-backed operator (after the panic and the fallback have occurred). I like the latter option since the former requires catching OOMs and is very-very hacky in order to not catch OOMs that are coming from other operators.
Assuming that we want to emit batch D from the disk-backed operator, we have a conflict with the unorderedDistinctFilterer - the hash table already contains all tuples from D, so we cannot filter that particular batch against the hash table because then we would never emit tuples from D. The solution I'm proposing right now is to have a special marker wrapper around batch D that indicates that filtering against the hash table should not be performed on this batch in particular.
An alternative approach would be somehow connecting the in-memory unordered distinct and unorderedDistinctFilterer directly so that the former in its ExportBuffered method could indicate (by a callback or modifying the filterer's internal state) that the next batch read by the filterer should not be filtered. However, this approach seems even worse.
There are some other complications due to the disk-spilling infrastructure. Here is the diagram of what the setup looks like with the current PR:
// ------------- input -----------
// | || | (2nd src)
// | || (1st src) ↓
// | ----||---> bufferExportingOperator
// ↓ | || ↓
// | | || unorderedDistinctFilterer
// in-memory distinct || ↓
// | || disk-backed distinct
// | || |
// | ↓↓ |
// ---------> disk spiller <--------
// ||
// ||
// ↓↓
// output
The alternative approach would require introducing a connection between in-memory distinct and unorderedDistinctFilterer.
Unfortunately, I don't see a better way around this issue since it is very subtle and requires careful coordination between several components. Possibly, if we were to change the disk-spilling infrastructure, maybe something better would come up, but I believe this is a one-off issue and having tight coupling of several components seems acceptable to me in this special case (as a reminder, the "specialness" of the case is due to the nature of in-memory unordered distinct having streaming-like behavior).
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)
pkg/sql/colexec/unordered_distinct.go, line 81 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Let me describe the current peculiar situation in a lot of details.
Suppose the in-memory unordered distinct has already read 3 batches
A,B,C, appended the distinct tuples from all of them into the hash table and emitted them. Now it reads batchDwhich has some new distinct tuples, but during the appending of these new distinct tuples the hash table reaches its limit.Here is what happens in
distinctBuild:
- batch
Dis updated in-place to remove the duplicate tuples within itself (this doesn't interact with tuples fromA,B, andCthat are in the hash table)- batch
Dis updated in-place again to remove the "global" duplicates. NowD's selection vector only contains tuples that are "globally" distinct which should be emitted into the output- however, before we can emit them into the output, we need to update the hash table to include these tuples, and this is where we hit an OOM which could occur for two reasons in
appendAllDistinct
3.1) because of appending the tuples toht.vals
3.2) because of reallocating internal slices of the hash table (buildFromBufferedTuples)
If the former is the case, then the hash table ends up in an inconsistent state, and we'll need to repair it before using.- regardless of when exactly an OOM error happened, a panic has occurred and we exited the execution of the in-memory unordered distinct. And what we have is the following: batch
Dcontains distinct tuples that need to be emitted into the output and those tuples have been appended to the hash table.Now the question arises on how to handle the fallback to the external distinct, and the fallback is subject to the following requirements:
- all tuples of updated in-place batch
Dneed to be emitted somehow, without being filtered by the hash table since the filtering of that batch has already occurred- all tuples in batch
Eand all consequent ones have to be checked against already emitted tuples from batchesA,B,C, andD(assumingDis somehow emitted).The second requirement can be satisfied in a relatively clean way by
unorderedDistinctFiltererplanned on top of the input to the disk-backed distinct; however, the first requirement is very tricky to get right. We can emit updated batchDin two places - either in the in-memory operator before panicking (this was my original approach) or in the disk-backed operator (after the panic and the fallback have occurred). I like the latter option since the former requires catching OOMs and is very-very hacky in order to not catch OOMs that are coming from other operators.Assuming that we want to emit batch
Dfrom the disk-backed operator, we have a conflict with theunorderedDistinctFilterer- the hash table already contains all tuples fromD, so we cannot filter that particular batch against the hash table because then we would never emit tuples fromD. The solution I'm proposing right now is to have a special marker wrapper around batchDthat indicates that filtering against the hash table should not be performed on this batch in particular.An alternative approach would be somehow connecting the in-memory unordered distinct and
unorderedDistinctFiltererdirectly so that the former in itsExportBufferedmethod could indicate (by a callback or modifying the filterer's internal state) that the next batch read by the filterer should not be filtered. However, this approach seems even worse.There are some other complications due to the disk-spilling infrastructure. Here is the diagram of what the setup looks like with the current PR:
// ------------- input ----------- // | || | (2nd src) // | || (1st src) ↓ // | ----||---> bufferExportingOperator // ↓ | || ↓ // | | || unorderedDistinctFilterer // in-memory distinct || ↓ // | || disk-backed distinct // | || | // | ↓↓ | // ---------> disk spiller <-------- // || // || // ↓↓ // outputThe alternative approach would require introducing a connection between
in-memory distinctandunorderedDistinctFilterer.Unfortunately, I don't see a better way around this issue since it is very subtle and requires careful coordination between several components. Possibly, if we were to change the disk-spilling infrastructure, maybe something better would come up, but I believe this is a one-off issue and having tight coupling of several components seems acceptable to me in this special case (as a reminder, the "specialness" of the case is due to the nature of in-memory unordered distinct having streaming-like behavior).
One idea that could simplify things a bit is that we could assume in unorderedDistinctFilterer that the first batch it sees is the already filtered one and emit it without filtering. It is possible to make this assumption because the in-memory unordered distinct will export at most one batch, and it will export no tuples only when the hash table is empty (which we can detect in the filterer). I added another commit implementing this idea, and I think it is a bit cleaner that the marker batch wrapper.
495e822 to
0e8d26e
Compare
asubiotto
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/unordered_distinct.go, line 81 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
One idea that could simplify things a bit is that we could assume in
unorderedDistinctFiltererthat the first batch it sees is the already filtered one and emit it without filtering. It is possible to make this assumption because the in-memory unordered distinct will export at most one batch, and it will export no tuples only when the hash table is empty (which we can detect in the filterer). I added another commit implementing this idea, and I think it is a bit cleaner that the marker batch wrapper.
Agreed with the latest suggestion. I think it's nice to keep the specialization as contained as possible. The commit looks good. I'll give this another review in its final state.
3fe1c17 to
1c0024e
Compare
|
Alright, I think I finally stabilized the test (the difficulty was that without limiting the number of tuples, it could take extremely long under stress in very rare circumstances; then I added a limit for max num tuples, and now we might not spill because of it). Anyway, I think it is RFAL. |
asubiotto
left a comment
There was a problem hiding this comment.
Reviewed 8 of 8 files at r3.
Reviewable status:complete! 1 of 0 LGTMs obtained
|
TFTR! bors r+ |
|
Canceled. |
|
bors r+ |
|
Build failed (retrying...): |
|
Build failed: |
|
Flake. bors r+ |
|
Build failed: |
|
Another flake (for which I opened a PR). bors r+ |
|
Build failed (retrying...): |
|
Build failed: |
In the recent addition of the external distinct we have a bug - we completely ignored the in-memory state (the hash table) of the in-memory unordered distinct when the spill occurs. This became problematic with the recent change to make the unordered distinct streaming-like: we now probe the hash table to remove all duplicates, add the distinct tuples into the hash table and emit them into the output. So currently it is possible for the external distinct to emit tuples that are duplicates of already emitted ones by the in-memory operator before the spill occurred. To go around this issue, without changing the streaming-like behavior of the in-memory unordered distinct, this commit introduces a special filterer on the input to the external distinct which removes all duplicates of tuples already emitted by the in-memory operator. (A side bonus is that it also removes the duplicates within the batch itself.) The complication here is that the last input batch read by the in-memory unordered distinct has already been updated in-place to include only the new distinct tuples and the tuples have been appended to the hash table right before the OOM occurs. This required plumbing an assumption that the first batch seen by the new filterer has already been filtered and should be returned unchanged. Another detail is that the hash table might be in an inconsistent state, so it'll need to be repaired before used by the filterer. Release note: None (no release with this bug)
|
Flake again, rebased on top of master. bors r+ |
|
Build succeeded: |
In the recent addition of the external distinct we have a bug - we
completely ignored the in-memory state (the hash table) of the in-memory
unordered distinct when the spill occurs. This became problematic with
the recent change to make the unordered distinct streaming-like: we now
probe the hash table to remove all duplicates, add the distinct tuples
into the hash table and emit them into the output. So currently it is
possible for the external distinct to emit tuples that are duplicates of
already emitted ones by the in-memory operator before the spill
occurred.
To go around this issue, without changing the streaming-like behavior of
the in-memory unordered distinct, this commit introduces a special
filterer on the input to the external distinct which removes all
duplicates of tuples already emitted by the in-memory operator. (A side
bonus is that it also removes the duplicates within the batch itself.)
The complication here is that the last input batch read by the in-memory
unordered distinct has already been updated in-place to include only the
new distinct tuples and the tuples have been appended to the hash table
right before the OOM occurs. This required plumbing an assumption that
the first batch seen by the new filterer has already been filtered and
should be returned unchanged. Another detail is that the hash table
might be in an inconsistent state, so it'll need to be repaired before
used by the filterer.
Fixes: #57858.
Fixes: #58286.
Release note: None (no release with this bug)