Skip to content

Conversation

@hrsakai
Copy link
Contributor

@hrsakai hrsakai commented Mar 22, 2017

Motivation

no c++ sample codes exist in Authentication.md and GettingStarted.md

Modifications

added sample codes to documents and a few tiny modified(fixed typo, etc).

If addition of sample codes is unnecessary, I will close this pull request.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks for adding this

@merlimat merlimat added this to the 1.17 milestone Mar 22, 2017
@merlimat merlimat added the doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. label Mar 22, 2017
@jai1
Copy link
Contributor

jai1 commented Mar 22, 2017

@hrsakai - I had to change the CPP Client code (#290) - rename a few APIs and classes in the Auth code in order to maintain backward compatibility with CPP client in production. You may have to use the new API names - let me know if you run into some trouble, I will help out.

@hrsakai
Copy link
Contributor Author

hrsakai commented Mar 22, 2017

@jai1
I reflected the new API names in sample code.please, check it.

@jai1
Copy link
Contributor

jai1 commented Mar 23, 2017

@hrsakai - changes look good - sorry for the trouble
Have restarted the build.

@merlimat merlimat merged commit a86b6df into apache:master Mar 23, 2017
sijie pushed a commit that referenced this pull request Dec 11, 2020
…oid deadlocks (#8877)

### Motivation

Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time.

[threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)

The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released.

```
"ForkJoinPool.commonPool-worker-120" #903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
        - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
```

The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1".
```
"pulsar-msg-expiry-monitor-24-1" #304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source)
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
```

I can't understand why "pulsar-msg-expiry-monitor-24-1" was stuck. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone can explain why "pulsar-msg-expiry-monitor-24-1" was stuck, please let me know.

### Modifications

When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.
eolivelli referenced this pull request in datastax/pulsar Dec 1, 2021
…oid deadlocks (apache#8877)

Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time.

[threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)

The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released.

```
"ForkJoinPool.commonPool-worker-120" apache#903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
        - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
```

The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1".
```
"pulsar-msg-expiry-monitor-24-1" #304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source)
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
```

I can't understand why "pulsar-msg-expiry-monitor-24-1" was stuck. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone can explain why "pulsar-msg-expiry-monitor-24-1" was stuck, please let me know.

When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.

(cherry picked from commit d08ac1d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Your PR contains doc changes, no matter whether the changes are in markdown or code files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants