Skip to content

Fix Stopping Logic and Maintain Stopping Latch Counter#877

Merged
shrinandthakkar merged 2 commits intolinkedin:masterfrom
shrinandthakkar:preventStoppingAlreadyStoppingTasks
Jun 8, 2022
Merged

Fix Stopping Logic and Maintain Stopping Latch Counter#877
shrinandthakkar merged 2 commits intolinkedin:masterfrom
shrinandthakkar:preventStoppingAlreadyStoppingTasks

Conversation

@shrinandthakkar
Copy link
Copy Markdown
Collaborator

@shrinandthakkar shrinandthakkar commented Dec 13, 2021

Fixing stopping logic for tasks by preventing calling stop on dead-thread tasks.

Also maintaining the stopping latch counter irrespective of success or failure in acquiring task lock otherwise it could lead to a state which causes multiple instances to work on the same task concurrently.


Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.

Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md

@vmaheshw
Copy link
Copy Markdown
Collaborator

vmaheshw commented Feb 1, 2022

@shrinandthakkar Can you please drive this to completion?

Copy link
Copy Markdown
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
}

protected void countDownStoppedLatch() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird api to expose to subclasses. How would an implr know when it is safe to invoke this?

} catch (DatastreamRuntimeException ex) {
// setting _stoppedLatch count to 0 since the lock couldn't be acquired,
// as a non-zero stoppedLatch value won't let the task to be stopped.
countDownStoppedLatch();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is being re-thrown anyway, so why not handle it in the super class? I don't like how tightly the super and sub classes are being coupled here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmaheshw @surajkn @ryannedolan

Let me take this up in a separate PR but to reduce the coupling here, could we do something like this in the AbstractKafkaConnector(from where the task threads are initiated).

  private ConnectorTaskEntry createKafkaConnectorTask(DatastreamTask task) {
    _logger.info("Creating connector task for datastream task {}.", task);
    AbstractKafkaBasedConnectorTask connectorTask = createKafkaBasedConnectorTask(task);
    Thread taskThread = createTaskThread(connectorTask);
    taskThread.start();

    // task cleanup thread cleans up the task resources like stopped latch counts post shutdown of the task
    Thread taskCleanUpThread = new Thread(() -> {
      try {
        taskThread.join();
      } catch (InterruptedException exception) {
        _logger.error(String.format("Got interrupted exception while waiting for the completion of task : %s ", connectorTask), exception);
      } finally {
        connectorTask.postShutdownCleanUp();
      }
    });
    taskCleanUpThread.start();
    return new ConnectorTaskEntry(connectorTask, taskThread);
  } 

the postShutdownCleanUp method here would take care of counting the latch down so we don't have to handle it in run() functions of either derived or base class implementations.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically it will create extra thread for each thread. I think it should work, but with the constraint of double task threads.

}
}

protected void countDownStoppedLatch() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmaheshw I remember you had expressed concern over this way of doing the count down. IIRC your concern was that any subclass of this can forget to call countDownStopppedLatch (although in this patch Shrinand added a call to it in KafkaMirrorMakerConnectorTask). So just wanted to check if that was considered and concluded that this is the most reasonable way of doing it?
cc @shrinandthakkar

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was not in support of doing it this way, but also did not have a better idea. So, we decided to do it, until we have a better idea.

@shrinandthakkar shrinandthakkar merged commit 5b30f54 into linkedin:master Jun 8, 2022
shrinandthakkar pushed a commit to shrinandthakkar/brooklin that referenced this pull request Jun 9, 2022
shrinandthakkar added a commit that referenced this pull request Jun 9, 2022
…fter merging #877 (#908)

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn1.linkedin.biz>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants