BMM Restart Improvements: Bug fixes and logging improvements#926
BMM Restart Improvements: Bug fixes and logging improvements#926jzakaryan merged 3 commits intolinkedin:masterfrom
Conversation
| new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build()); | ||
| // TODO Assess whether having a single threaded executor for token claim tasks is sufficient or it will be exhausted | ||
| _tokenClaimExecutor = Executors.newSingleThreadExecutor( | ||
| _tokenClaimExecutor = Executors.newFixedThreadPool(TOKEN_CLAIM_THREAD_POOL_SIZE, |
There was a problem hiding this comment.
A single threaded token claim executor will get backed up in the following cases:
- The leader will try to schedule 2 tasks in this executor upon a stream stop: (1) a task for polling the Zookeeper to check whether stop has propagated and completed across the cluster (2) a task for claiming the token assigned to itself.
- Concurrent requests to stop different streams.
A single threaded executor won't work here. For now I increased the thread pool size to 8 threads. That will let the cluster process up to 8 concurrent requests without being backed up (note that token claim tasks are short lived unlike the poll tasks, so they won't be the bottleneck here). I'll monitor how this performs in EI and make adjustments accordingly. A config property can also be added in the future if we end up tinkering with this.
| static List<Datastream> inferStoppingDatastreamsFromAssignment(List<DatastreamTask> newAssignment, | ||
| List<DatastreamTask> removedTasks) { | ||
| Map<String, List<Datastream>> taskPrefixToDatastream = removedTasks.stream(). | ||
| collect(Collectors.toMap(DatastreamTask::getTaskPrefix, DatastreamTask::getDatastreams)); |
There was a problem hiding this comment.
For streams with >1 task assigned to a host, this was causing the task claim thread to silently fail with a DuplicateKeyException
| collect(Collectors.toList()); | ||
| List<String> newAssignmentTaskNames = newAssignment.stream().map(DatastreamTask::getDatastreamTaskName). | ||
| collect(Collectors.toList()); | ||
| _log.debug("Claiming assignment tokens. Old assignment: {}", oldAssignmentTaskNames); |
There was a problem hiding this comment.
nit: shouldn't you rather want these logs after / (inside when) a thread is scheduled for claiming the assignment tokens?
There was a problem hiding this comment.
I just wanted these printed before scheduling the task for claiming tokens. These logs helped me figure out the bug with the dying tokens claim thread. I'm also thinking to clean these up once the feature is tested more and rolled out in production.
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
| new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build()); | ||
| // TODO Assess whether having a single threaded executor for token claim tasks is sufficient or it will be exhausted | ||
| _tokenClaimExecutor = Executors.newSingleThreadExecutor( | ||
| _tokenClaimExecutor = Executors.newFixedThreadPool(TOKEN_CLAIM_THREAD_POOL_SIZE, |
This pull request addresses bugs and makes logging improvements in Assignment Tokens Feature (#919 #921 #922 #924)