Replacing Coordinator Queue With Deque & Fixing Usage Of toMap Util#950
Conversation
8c79c2c to
c1dfa84
Compare
c1dfa84 to
625fb0b
Compare
| // scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt. | ||
| _leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> { | ||
| _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader)); | ||
| _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader), false); |
There was a problem hiding this comment.
This is the main change in this PR. When a newly elected leader does assignment, it schedules the assignment event with task cleanup in front of the queue, to make sure it gets executed before anything else. Just to confirm my understanding.
There was a problem hiding this comment.
Correct! We just need to make sure that nothing else gets handled apart from a successful handling of "LEADER_DO_ASSIGNMENT" (with newly elected leader flag enabled) for a newly elected leader.
There was a problem hiding this comment.
isNewlyElectedLeader this will always be true, right? or when do we have that false?
There was a problem hiding this comment.
isNewlyElectedLeader is only true when a new leader is elected. In many other cases we have to ask the leader to do assignments, and at those calls, isNewlyElectedLeader will be false.
| /** | ||
| * Callable Coordinator is used for overriding coordinator behaviors for tests | ||
| */ | ||
| public interface CallableCoordinatorForTest { |
There was a problem hiding this comment.
Why do we need this interface? Seems like TestCoordinator.java has all you need.
There was a problem hiding this comment.
I didnt wanted to add another constructor in the TestCoordinator.java to override another method of coordinator. With this interface, we could minimize code duplication and pass the overrides of coordinator as an argument.
For the test "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath", I overrode performPreAssignmentCleanup method to test a failure path, where I am using this.
|
|
||
| /** | ||
| * Add a single event to the queue, de-duping events with the same name and same metadata. | ||
| * @param event CoordinatorEvent event to add to the queue | ||
| * @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise. | ||
| */ | ||
| public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { |
There was a problem hiding this comment.
Can the public APIs be similar to a Deque? I think that put(<event>) and putFirst(<event>) are clearer than using a boolean flag. The boolean option is fine for the private internal implementation.
...c/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java
Show resolved
Hide resolved
datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
Show resolved
Hide resolved
| */ | ||
| public synchronized void put(CoordinatorEvent event) { | ||
| LOG.info("Queuing event {} to event queue", event.getType()); | ||
| put(event, true); |
There was a problem hiding this comment.
When do we need to support for inserting at rear?
There was a problem hiding this comment.
All the events to this coordinator queue are inserted in the rear. The only case in which we have to insert in the front is what the PR proposes.
| * @param event CoordinatorEvent event to add to the queue | ||
| * @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise. | ||
| */ | ||
| public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { |
There was a problem hiding this comment.
for understanding: LinkedBlockingDeque is already a thread safe and we are using only offer or offerFirst, what is the rational on having this method synchronized?
There was a problem hiding this comment.
[Minor]: another thing is, we want to have add from both ends but remove should be only from front. However, by using deque we will allow add/remove from both ends, Is there a way we can restrict remove from rear?
There was a problem hiding this comment.
The synchronized methods were added long before but I am guessing that the shared set variable might create race and would let in duplicate events in some cases if those methods are not synchronized.
That is why the wrapper on top of the LinkedBlockingDeque is implemented which only supports taking out events from the front.
e122778 to
a7f4e03
Compare
Summary
Deque Coordinator Queue
Bug Fixing toMap() Usage
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md