-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
linked issue: #8115
Consumer could be stucked. after adding new consumer to the key_shared subscriptions. same issue #8115 raised before , be fixed in #10096
But after testing the code, We still got stucked.
To Reproduce:
1、firstly, create a new key_shared consumer
java -cp pulsar-pubsub-examples.jar
io.streamnative.examples.pubsub.SyncStringConsumerExample
--topic t7
--subscription-name example-sub
--subscription-initial-position Earliest
--subscription-type Key_Shared
--num-messages 0
2、produce data with two keys:
java -cp pulsar-pubsub-examples.jar
io.streamnative.examples.pubsub.SyncStringProducerExample
--topic t7
--num-keys 2
--messages-rate 1
--num-messages 100000000
3、adding a new consumer:
java -cp pulsar-pubsub-examples.jar
io.streamnative.examples.pubsub.SyncStringConsumerExample
--topic t7
--subscription-name example-sub
--subscription-initial-position Earliest
--subscription-type Key_Shared
--num-messages 0
after adding new consumer, the old consumer got stucked:

the new consumer received no message:

stats of topic:
{
"msgRateIn" : 0.9999983401194219,
"msgThroughputIn" : 66.99988878800126,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 53295,
"msgInCounter" : 799,
"bytesOutCounter" : 835,
"msgOutCounter" : 13,
"averageMsgSize" : 66.99999999999999,
"msgChunkPublished" : false,
"storageSize" : 53295,
"backlogSize" : 53295,
"offloadedStorageSize" : 0,
"publishers" : [ {
"msgRateIn" : 0.9999983401194219,
"msgThroughputIn" : 66.99988878800126,
"averageMsgSize" : 67.0,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"metadata" : { },
"producerName" : "ys-pulsar-8-2",
"connectedSince" : "2021-04-08T10:13:56.814+08:00",
"clientVersion" : "2.7.1",
"address" : "/10.89.133.11:24082"
} ],
"subscriptions" : {
"example-sub" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 835,
"msgOutCounter" : 13,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"msgBacklog" : 799,
"backlogSize" : 0,
"msgBacklogNoDelayed" : 799,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 13,
"type" : "Key_Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1617848047235,
"lastConsumedTimestamp" : 1617848048048,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 835,
"msgOutCounter" : 13,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0.0,
"consumerName" : "5382f",
"availablePermits" : 987,
"unackedMessages" : 13,
"avgMessagesPerEntry" : 255,
"blockedConsumerOnUnackedMsgs" : false,
"readPositionWhenJoining" : "133:0",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 1617848048048,
"keyHashRanges" : [ "[32769, 65536]" ],
"metadata" : { },
"connectedSince" : "2021-04-08T10:13:47.108+08:00",
"clientVersion" : "2.7.1",
"address" : "/10.89.133.11:24054"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0.0,
"consumerName" : "f1013",
"availablePermits" : 1000,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 1000,
"blockedConsumerOnUnackedMsgs" : false,
"readPositionWhenJoining" : "133:12",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"keyHashRanges" : [ "[0, 32768]" ],
"metadata" : { },
"connectedSince" : "2021-04-08T10:14:07.231+08:00",
"clientVersion" : "2.7.1",
"address" : "/10.89.133.11:24112"
} ],
"isDurable" : true,
"isReplicated" : false,
"consumersAfterMarkDeletePosition" : {
"f1013" : "133:12"
},
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0
}
stats-internal:
{
"entriesAddedCounter" : 841,
"numberOfEntries" : 841,
"totalSize" : 56109,
"currentLedgerEntries" : 841,
"currentLedgerSize" : 56109,
"lastLedgerCreatedTimestamp" : "2021-04-08T10:13:47.087+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "133:840",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 133,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"example-sub" : {
"markDeletePosition" : "133:-1",
"readPosition" : "133:14",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 135,
"cursorLedgerLastEntry" : 0,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-04-08T10:13:47.097+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 15,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
},
"schemaLedgers" : [ {
"ledgerId" : 134,
"entries" : 1,
"size" : 98,
"offloaded" : false
} ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false
}
}
debug info: after adding new consumer, always readType='replay', and messagesForC=0. I think it's the reason why no message send. But I haven't found why these happened
