Make TaskBatcher Less Lock-Heavy#82227
Make TaskBatcher Less Lock-Heavy#82227original-brownbear merged 7 commits intoelastic:masterfrom original-brownbear:less-locking-task-batcher
Conversation
In many shards benchmarks we see a lot of contention when submitting tasks. This is obvious when working with lots of large task batches and doing long iterations. We don't need to lock in `runIfNotProcessed` past removign the task set for a key and can be a little more efficient when it comes to creating the new tasks set in `submitTasks` as well. Also, we don't need to use a fully locking map as we often have operations for different batching keys interleaved so moving to CHM as well. This change is particularly relevant for stability because we often submit tasks from network threads directly where grinding through a e.g. bunch of shard state updates and having to lock on the map over and over while e.g. a huge batch of index create or so was iterated over in `runIfNotProcessed` caused very visible latency.
|
Pinging @elastic/es-distributed (Team:Distributed) |
|
Also relates #81626. I'm unlikely to get to this today, bear with me. |
| return new LinkedHashSet<>(tasks); | ||
| } | ||
| for (BatchedTask existing : existingTasks) { | ||
| // check that there won't be two tasks with the same identity for the same batching key |
There was a problem hiding this comment.
FWIW I don't think I've ever seen this check fail, maybe we could make it an assertion?
There was a problem hiding this comment.
I'm all for it. I think we can only get here via a bug and I've also never seen this happen outside of bugs during experimenting with stuff. I'll make it an assertion :)
No rush :) + Made the dup. check an assertion now. |
henningandersen
left a comment
There was a problem hiding this comment.
I wonder if we can use an immutable (or alternatively concurrent) structure in the map?
| if (existingTasks == null) { | ||
| return new LinkedHashSet<>(tasks); | ||
| } | ||
| existingTasks.addAll(tasks); |
There was a problem hiding this comment.
I am not too fond of this pattern. I think it works due to how CHM synchronizes on the head of the bucket in both compute and remove. But I think there is no guarantee from CHM to do either. It seems unlikely that compute could run the remapping function on the same key in parallel (since that would break the call only once guarantee), but I am less certain that a future evolution of CHM could not remove the synchronized in remove (do not see a way to do it though).
In short, I think we rely too much on the internals of CHM here in a very central place. Could we perhaps go immutable here and return a list of the two lists - and flatten it when extracted/removed?
There was a problem hiding this comment.
I think immutable either gets quite tricky (if we keep a list of lists, the timeout task removal gets awkward) or slow if we copy over and over. I'd rather go synchronized I think, it shouldn't be too much overhead here given how we will never have contention on the collection.
There was a problem hiding this comment.
Though tbh. I'm not sure about the need for this. CHM docs state:
This class obeys the same functional specification as {@link java.util.Hashtable}
which means that remove will always be synchronized with the compute operation I'd assume?
There was a problem hiding this comment.
It says right after that:
This class is fully
* interoperable with {@code Hashtable} in programs that rely on its
* thread safety but not on its synchronization details.
It is also just nice to have this code "obviously correct", since it is so important here.
|
Made the collection synchronized now 56ba16c let me know what you think :) |
henningandersen
left a comment
There was a problem hiding this comment.
Thanks, I think synchronized is fine, though we should explicitly synchronize instead.
| } | ||
| final Set<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey); | ||
| if (pending != null) { | ||
| for (BatchedTask task : pending) { |
There was a problem hiding this comment.
I think we would want to synchronize explicitly on the pending set instead of relying on Collections.synchronizedSet, iterating a synchronized set like this is not really safe.
There was a problem hiding this comment.
Right fair point :) Added locking to this loop.
| if (existingTasks == null) { | ||
| return new LinkedHashSet<>(tasks); | ||
| } | ||
| existingTasks.addAll(tasks); |
There was a problem hiding this comment.
It says right after that:
This class is fully
* interoperable with {@code Hashtable} in programs that rely on its
* thread safety but not on its synchronization details.
It is also just nice to have this code "obviously correct", since it is so important here.
|
Jenkins run elasticsearch-ci/bwc |
|
Thanks Henning + David! |
💚 Backport successful
|
In many shards benchmarks we see a lot of contention when submitting tasks. This is obvious when working with lots of large task batches and doing long iterations. We don't need to lock in `runIfNotProcessed` past removign the task set for a key and can be a little more efficient when it comes to creating the new tasks set in `submitTasks` as well. Also, we don't need to use a fully locking map as we often have operations for different batching keys interleaved so moving to CHM as well. This change is particularly relevant for stability because we often submit tasks from network threads directly where grinding through a e.g. bunch of shard state updates and having to lock on the map over and over while e.g. a huge batch of index create or so was iterated over in `runIfNotProcessed` caused very visible latency.
In many shards benchmarks we see a lot of contention when submitting tasks.
This is obvious when working with lots of large task batches and doing long iterations.
We don't need to lock in
runIfNotProcessedpast removing the task set for a keyand can be a little more efficient when it comes to creating the new tasks set in
submitTasksas well.Also, we don't need to use a fully locking map as we often have operations for different
batching keys interleaved so moving to CHM as well.
This change is particularly relevant for stability because we often submit tasks from
network threads directly where grinding through a e.g. bunch of shard state updates and
having to lock on the map over and over while e.g. a huge batch of index create or so
was iterated over in
runIfNotProcessedcaused very visible latency.relates #77466