Fix Stopping Logic and Maintain Stopping Latch Counter#877
Conversation
.../src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
Show resolved
Hide resolved
...java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java
Show resolved
Hide resolved
|
@shrinandthakkar Can you please drive this to completion? |
.../src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| protected void countDownStoppedLatch() { |
There was a problem hiding this comment.
This is a weird api to expose to subclasses. How would an implr know when it is safe to invoke this?
| } catch (DatastreamRuntimeException ex) { | ||
| // setting _stoppedLatch count to 0 since the lock couldn't be acquired, | ||
| // as a non-zero stoppedLatch value won't let the task to be stopped. | ||
| countDownStoppedLatch(); |
There was a problem hiding this comment.
The exception is being re-thrown anyway, so why not handle it in the super class? I don't like how tightly the super and sub classes are being coupled here.
There was a problem hiding this comment.
@vmaheshw @surajkn @ryannedolan
Let me take this up in a separate PR but to reduce the coupling here, could we do something like this in the AbstractKafkaConnector(from where the task threads are initiated).
private ConnectorTaskEntry createKafkaConnectorTask(DatastreamTask task) {
_logger.info("Creating connector task for datastream task {}.", task);
AbstractKafkaBasedConnectorTask connectorTask = createKafkaBasedConnectorTask(task);
Thread taskThread = createTaskThread(connectorTask);
taskThread.start();
// task cleanup thread cleans up the task resources like stopped latch counts post shutdown of the task
Thread taskCleanUpThread = new Thread(() -> {
try {
taskThread.join();
} catch (InterruptedException exception) {
_logger.error(String.format("Got interrupted exception while waiting for the completion of task : %s ", connectorTask), exception);
} finally {
connectorTask.postShutdownCleanUp();
}
});
taskCleanUpThread.start();
return new ConnectorTaskEntry(connectorTask, taskThread);
}
the postShutdownCleanUp method here would take care of counting the latch down so we don't have to handle it in run() functions of either derived or base class implementations.
There was a problem hiding this comment.
Basically it will create extra thread for each thread. I think it should work, but with the constraint of double task threads.
| } | ||
| } | ||
|
|
||
| protected void countDownStoppedLatch() { |
There was a problem hiding this comment.
@vmaheshw I remember you had expressed concern over this way of doing the count down. IIRC your concern was that any subclass of this can forget to call countDownStopppedLatch (although in this patch Shrinand added a call to it in KafkaMirrorMakerConnectorTask). So just wanted to check if that was considered and concluded that this is the most reasonable way of doing it?
cc @shrinandthakkar
There was a problem hiding this comment.
Yes, I was not in support of doing it this way, but also did not have a better idea. So, we decided to do it, until we have a better idea.
Fixing stopping logic for tasks by preventing calling stop on dead-thread tasks.
Also maintaining the stopping latch counter irrespective of success or failure in acquiring task lock otherwise it could lead to a state which causes multiple instances to work on the same task concurrently.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md