Skip to content

Make TaskBatcher Less Lock-Heavy#82227

Merged
original-brownbear merged 7 commits intoelastic:masterfrom
original-brownbear:less-locking-task-batcher
Jan 14, 2022
Merged

Make TaskBatcher Less Lock-Heavy#82227
original-brownbear merged 7 commits intoelastic:masterfrom
original-brownbear:less-locking-task-batcher

Conversation

@original-brownbear
Copy link
Copy Markdown
Contributor

In many shards benchmarks we see a lot of contention when submitting tasks.
This is obvious when working with lots of large task batches and doing long iterations.

We don't need to lock in runIfNotProcessed past removing the task set for a key
and can be a little more efficient when it comes to creating the new tasks set in
submitTasks as well.
Also, we don't need to use a fully locking map as we often have operations for different
batching keys interleaved so moving to CHM as well.

This change is particularly relevant for stability because we often submit tasks from
network threads directly where grinding through a e.g. bunch of shard state updates and
having to lock on the map over and over while e.g. a huge batch of index create or so
was iterated over in runIfNotProcessed caused very visible latency.

relates #77466

In many shards benchmarks we see a lot of contention when submitting tasks.
This is obvious when working with lots of large task batches and doing long iterations.

We don't need to lock in `runIfNotProcessed` past removign the task set for a key
and can be a little more efficient when it comes to creating the new tasks set in
`submitTasks` as well.
Also, we don't need to use a fully locking map as we often have operations for different
batching keys interleaved so moving to CHM as well.

This change is particularly relevant for stability because we often submit tasks from
network threads directly where grinding through a e.g. bunch of shard state updates and
having to lock on the map over and over while e.g. a huge batch of index create or so
was iterated over in `runIfNotProcessed` caused very visible latency.
@original-brownbear original-brownbear added >enhancement :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. v8.0.0 v8.1.0 labels Jan 4, 2022
@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team. label Jan 4, 2022
@elasticmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner
Copy link
Copy Markdown
Member

Also relates #81626. I'm unlikely to get to this today, bear with me.

return new LinkedHashSet<>(tasks);
}
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I don't think I've ever seen this check fail, maybe we could make it an assertion?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all for it. I think we can only get here via a bug and I've also never seen this happen outside of bugs during experimenting with stuff. I'll make it an assertion :)

@original-brownbear
Copy link
Copy Markdown
Contributor Author

I'm unlikely to get to this today, bear with me.

No rush :) + Made the dup. check an assertion now.

Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use an immutable (or alternatively concurrent) structure in the map?

if (existingTasks == null) {
return new LinkedHashSet<>(tasks);
}
existingTasks.addAll(tasks);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too fond of this pattern. I think it works due to how CHM synchronizes on the head of the bucket in both compute and remove. But I think there is no guarantee from CHM to do either. It seems unlikely that compute could run the remapping function on the same key in parallel (since that would break the call only once guarantee), but I am less certain that a future evolution of CHM could not remove the synchronized in remove (do not see a way to do it though).

In short, I think we rely too much on the internals of CHM here in a very central place. Could we perhaps go immutable here and return a list of the two lists - and flatten it when extracted/removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think immutable either gets quite tricky (if we keep a list of lists, the timeout task removal gets awkward) or slow if we copy over and over. I'd rather go synchronized I think, it shouldn't be too much overhead here given how we will never have contention on the collection.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though tbh. I'm not sure about the need for this. CHM docs state:

This class obeys the same functional specification as {@link java.util.Hashtable}

which means that remove will always be synchronized with the compute operation I'd assume?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says right after that:

This class is fully
 * interoperable with {@code Hashtable} in programs that rely on its
 * thread safety but not on its synchronization details.

It is also just nice to have this code "obviously correct", since it is so important here.

@original-brownbear
Copy link
Copy Markdown
Contributor Author

Made the collection synchronized now 56ba16c let me know what you think :)

Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think synchronized is fine, though we should explicitly synchronize instead.

}
final Set<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
for (BatchedTask task : pending) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to synchronize explicitly on the pending set instead of relying on Collections.synchronizedSet, iterating a synchronized set like this is not really safe.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right fair point :) Added locking to this loop.

if (existingTasks == null) {
return new LinkedHashSet<>(tasks);
}
existingTasks.addAll(tasks);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says right after that:

This class is fully
 * interoperable with {@code Hashtable} in programs that rely on its
 * thread safety but not on its synchronization details.

It is also just nice to have this code "obviously correct", since it is so important here.

@original-brownbear
Copy link
Copy Markdown
Contributor Author

Jenkins run elasticsearch-ci/bwc

Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@original-brownbear
Copy link
Copy Markdown
Contributor Author

Thanks Henning + David!

@original-brownbear original-brownbear merged commit dc27d1d into elastic:master Jan 14, 2022
@original-brownbear original-brownbear deleted the less-locking-task-batcher branch January 14, 2022 15:36
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

💚 Backport successful

Status Branch Result
8.0

elasticsearchmachine pushed a commit that referenced this pull request Jan 14, 2022
In many shards benchmarks we see a lot of contention when submitting tasks.
This is obvious when working with lots of large task batches and doing long iterations.

We don't need to lock in `runIfNotProcessed` past removign the task set for a key
and can be a little more efficient when it comes to creating the new tasks set in
`submitTasks` as well.
Also, we don't need to use a fully locking map as we often have operations for different
batching keys interleaved so moving to CHM as well.

This change is particularly relevant for stability because we often submit tasks from
network threads directly where grinding through a e.g. bunch of shard state updates and
having to lock on the map over and over while e.g. a huge batch of index create or so
was iterated over in `runIfNotProcessed` caused very visible latency.
@original-brownbear original-brownbear restored the less-locking-task-batcher branch April 18, 2023 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. >enhancement Team:Distributed Meta label for distributed team. v8.0.0-rc2 v8.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants