-
Notifications
You must be signed in to change notification settings - Fork 3.7k
RawReader allows futures to be cancelled #1043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RawReader#readNextAsync returns a Future<RawMessage> to the user, which includes a retained ByteBuf. If the user is no longer interested in the result of the read call, it should be able to cancel to ensure that the ByteBuf is released. If there are multiple read requests outstanding, and one of them is cancelled, all read requests that were made after the cancelled request will also be cancelled.
sijie
approved these changes
Jan 10, 2018
Member
sijie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it might be better if there is a test for that.
Contributor
Author
|
Will add one |
sijie
approved these changes
Jan 10, 2018
merlimat
approved these changes
Jan 10, 2018
Contributor
merlimat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
codelipenghui
pushed a commit
that referenced
this pull request
Apr 14, 2020
### Motivation Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. This happened in v2.4.2 and master branch. As the following threaddump at that time: ``` "bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119) at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712) - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687) at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ... "ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856) - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ``` `PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`) On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`) So, it seems that deadlock happens. ### Modifications Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
addisonj
pushed a commit
to instructure/pulsar
that referenced
this pull request
May 7, 2020
### Motivation Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. This happened in v2.4.2 and master branch. As the following threaddump at that time: ``` "bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119) at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712) - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687) at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ... "ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856) - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ``` `PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`) On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`) So, it seems that deadlock happens. ### Modifications Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
jiazhai
pushed a commit
that referenced
this pull request
May 8, 2020
### Motivation Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. This happened in v2.4.2 and master branch. As the following threaddump at that time: ``` "bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119) at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712) - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687) at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ... "ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856) - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ``` `PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`) On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`) So, it seems that deadlock happens. ### Modifications Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`) (cherry picked from commit 6d30414)
jerrypeng
pushed a commit
to jerrypeng/incubator-pulsar
that referenced
this pull request
May 15, 2020
### Motivation Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. This happened in v2.4.2 and master branch. As the following threaddump at that time: ``` "bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119) at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712) - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687) at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ... "ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856) - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ``` `PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`) On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`) So, it seems that deadlock happens. ### Modifications Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
huangdx0726
pushed a commit
to huangdx0726/pulsar
that referenced
this pull request
Aug 24, 2020
### Motivation Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. This happened in v2.4.2 and master branch. As the following threaddump at that time: ``` "bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119) at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712) - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687) at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241) at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ... "ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856) - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ``` `PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`) On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`) So, it seems that deadlock happens. ### Modifications Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
RawReader#readNextAsync returns a Future to the user, which includes a
retained ByteBuf. If the user is no longer interested in the result of
the read call, it should be able to cancel to ensure that the ByteBuf
is released.
Cancellation messes up the state of a stream (if you cancel a read, should the subsequent read get the message that read would have received, or the message after it?), so it should only occur when a you are about to close a reader or seek to a new position.
The return value of a call to cancel must be checked, as if it fails the future may have completes successfully. If this is the case, the returned RawMessage needs to be closed to avoid leaking ByteBufs.