Skip to content

Conversation

@ivankelly
Copy link
Contributor

@ivankelly ivankelly commented Jan 9, 2018

RawReader#readNextAsync returns a Future to the user, which includes a
retained ByteBuf. If the user is no longer interested in the result of
the read call, it should be able to cancel to ensure that the ByteBuf
is released.

Cancellation messes up the state of a stream (if you cancel a read, should the subsequent read get the message that read would have received, or the message after it?), so it should only occur when a you are about to close a reader or seek to a new position.

The return value of a call to cancel must be checked, as if it fails the future may have completes successfully. If this is the case, the returned RawMessage needs to be closed to avoid leaking ByteBufs.

RawReader#readNextAsync returns a Future<RawMessage> to the user, which includes a
retained ByteBuf. If the user is no longer interested in the result of
the read call, it should be able to cancel to ensure that the ByteBuf
is released.

If there are multiple read requests outstanding, and one of them is
cancelled, all read requests that were made after the cancelled
request will also be cancelled.
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it might be better if there is a test for that.

@ivankelly
Copy link
Contributor Author

Will add one

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat merged commit 1458e66 into apache:master Jan 10, 2018
codelipenghui pushed a commit that referenced this pull request Apr 14, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
addisonj pushed a commit to instructure/pulsar that referenced this pull request May 7, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
jiazhai pushed a commit that referenced this pull request May 8, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time.
This happened in v2.4.2 and master branch.

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
```
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`.
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`.
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
(cherry picked from commit 6d30414)
jerrypeng pushed a commit to jerrypeng/incubator-pulsar that referenced this pull request May 15, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants