Skip to content

MINOR: Fix Deadlock in StreamThread#2791

Closed
original-brownbear wants to merge 2 commits into
apache:trunkfrom
original-brownbear:fix-streams-deadlock
Closed

MINOR: Fix Deadlock in StreamThread#2791
original-brownbear wants to merge 2 commits into
apache:trunkfrom
original-brownbear:fix-streams-deadlock

Conversation

@original-brownbear

@original-brownbear original-brownbear commented Apr 2, 2017

Copy link
Copy Markdown
Member

I think this may be the (or on of them) reason we see Jenkins jobs time out at times. At least I can reproduce this to cause tests to time out with a certain rate.

With current trunk there is a possibility to run into this:

"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

In a nutshell: KafkaStreams and StreamThread are both waiting for each other since another intermittend close (eg. from a test) comes along also trying to lock on KafkaStreams :

"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:71)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees atomic changes to the state but does not lock on the whole object (I at least could not find another method that would require more than atomicly-locked access except for setState).
Also qualified the state listeners with their outer-class to make the whole code-flow around this more readable (having two interfaces with the same naming for interface and method and then using them between their two outer classes is crazy hard to get imo :)).

Easy to reproduced yourself by running org.apache.kafka.streams.KafkaStreamsTest in a loop for a bit (save yourself some time by running 2-4 in parallel :)). Eventually it will lock on one of the tests (for me this takes less than 1 min with 4 parallel runs).

@asfbot

asfbot commented Apr 2, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2625/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented Apr 2, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2621/
Test FAILed (JDK 7 and Scala 2.10).

@original-brownbear

Copy link
Copy Markdown
Member Author

@hachikuji @ijuma fyi :) We def. have an issue here in my opinion (bottom of description has a straightforward reproducer), let me know what you think about the solution :)

@ijuma

ijuma commented Apr 2, 2017

Copy link
Copy Markdown
Member

Thanks for the PR, cc @dguy @guozhangwang

@asfbot

asfbot commented Apr 2, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2621/
Test PASSed (JDK 8 and Scala 2.12).

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice finding @original-brownbear ! Just some code style nit pick. Overall LGTM.

if (stateListener != null) {
stateListener.onChange(state, oldState);
private void setState(final State newState) {
synchronized (this.stateLock) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this

synchronized (this.stateLock) {
final State oldState = state;
if (!state.isValidTransition(newState)) {
log.warn(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: fix line break

} catch (RuntimeException t) {
log.error("{} Failed while executing {} {} due to {}: ",
StreamThread.this.logPrefix,
this.logPrefix,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: also remove this

@original-brownbear

Copy link
Copy Markdown
Member Author

@mjsax thanks for taking a look! Fixed the codestyle issues :)

@asfbot

asfbot commented Apr 2, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2628/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented Apr 2, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2624/
Test PASSed (JDK 7 and Scala 2.10).

@ijuma

ijuma commented Apr 2, 2017

Copy link
Copy Markdown
Member

Since @guozhangwang is away on holiday and this is an important fix (since it fixes one cause of hung Jenkins builds), I'll review and merge it after we get a second opinion from someone familiar with the Streams code, cc @enothereska @dguy.

@asfbot

asfbot commented Apr 2, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2624/
Test PASSed (JDK 8 and Scala 2.12).

@dguy dguy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@enothereska

Copy link
Copy Markdown
Contributor

LGTM thanks.

private class StreamStateListener implements StreamThread.StateListener {
private final class StreamStateListener implements StreamThread.StateListener {
@Override
public synchronized void onChange(final StreamThread thread,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy @enothereska This synchronized here seems suspicious. Is it really the aim to synchronize on the listener instance when updating variables like threadState? Seems like a bug.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there should only be a single StreamStateListener and threadState should be a member.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dguy. That makes sense. @original-brownbear, maybe you can do a follow-up that does that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma put it on my todos, will get to this in about ~12h :)

@ijuma ijuma left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM.

@asfgit asfgit closed this in 3364f12 Apr 3, 2017
@ijuma

ijuma commented Apr 3, 2017

Copy link
Copy Markdown
Member

One more thing: please keep in mind that the PR description becomes the commit message. It's good to aim for a clear and concise description of the issue.

@original-brownbear original-brownbear deleted the fix-streams-deadlock branch April 3, 2017 18:56
@original-brownbear

Copy link
Copy Markdown
Member Author

@ijuma got it, will be kept in mind for the next PR :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants