Skip to content

consumer stucked after adding new consumers to the KEY_SHARED subscriptions #10167

@gaozhangmin

Description

@gaozhangmin

linked issue: #8115
Consumer could be stucked. after adding new consumer to the key_shared subscriptions. same issue #8115 raised before , be fixed in #10096
But after testing the code, We still got stucked.

To Reproduce:
1、firstly, create a new key_shared consumer

java -cp pulsar-pubsub-examples.jar
io.streamnative.examples.pubsub.SyncStringConsumerExample
--topic t7
--subscription-name example-sub
--subscription-initial-position Earliest
--subscription-type Key_Shared
--num-messages 0

2、produce data with two keys:

java -cp pulsar-pubsub-examples.jar
io.streamnative.examples.pubsub.SyncStringProducerExample
--topic t7
--num-keys 2
--messages-rate 1
--num-messages 100000000

3、adding a new consumer:

java -cp pulsar-pubsub-examples.jar
io.streamnative.examples.pubsub.SyncStringConsumerExample
--topic t7
--subscription-name example-sub
--subscription-initial-position Earliest
--subscription-type Key_Shared
--num-messages 0

after adding new consumer, the old consumer got stucked:
image

the new consumer received no message:
image

stats of topic:

{
  "msgRateIn" : 0.9999983401194219,
  "msgThroughputIn" : 66.99988878800126,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 53295,
  "msgInCounter" : 799,
  "bytesOutCounter" : 835,
  "msgOutCounter" : 13,
  "averageMsgSize" : 66.99999999999999,
  "msgChunkPublished" : false,
  "storageSize" : 53295,
  "backlogSize" : 53295,
  "offloadedStorageSize" : 0,
  "publishers" : [ {
    "msgRateIn" : 0.9999983401194219,
    "msgThroughputIn" : 66.99988878800126,
    "averageMsgSize" : 67.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "metadata" : { },
    "producerName" : "ys-pulsar-8-2",
    "connectedSince" : "2021-04-08T10:13:56.814+08:00",
    "clientVersion" : "2.7.1",
    "address" : "/10.89.133.11:24082"
  } ],
  "subscriptions" : {
    "example-sub" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 835,
      "msgOutCounter" : 13,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 799,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 799,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 13,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1617848047235,
      "lastConsumedTimestamp" : 1617848048048,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 835,
        "msgOutCounter" : 13,
        "msgRateRedeliver" : 0.0,
        "chuckedMessageRate" : 0.0,
        "consumerName" : "5382f",
        "availablePermits" : 987,
        "unackedMessages" : 13,
        "avgMessagesPerEntry" : 255,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "133:0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 1617848048048,
        "keyHashRanges" : [ "[32769, 65536]" ],
        "metadata" : { },
        "connectedSince" : "2021-04-08T10:13:47.108+08:00",
        "clientVersion" : "2.7.1",
        "address" : "/10.89.133.11:24054"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "chuckedMessageRate" : 0.0,
        "consumerName" : "f1013",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 1000,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "133:12",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "keyHashRanges" : [ "[0, 32768]" ],
        "metadata" : { },
        "connectedSince" : "2021-04-08T10:14:07.231+08:00",
        "clientVersion" : "2.7.1",
        "address" : "/10.89.133.11:24112"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : {
        "f1013" : "133:12"
      },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0
}

stats-internal:

{
  "entriesAddedCounter" : 841,
  "numberOfEntries" : 841,
  "totalSize" : 56109,
  "currentLedgerEntries" : 841,
  "currentLedgerSize" : 56109,
  "lastLedgerCreatedTimestamp" : "2021-04-08T10:13:47.087+08:00",
  "waitingCursorsCount" : 0,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "133:840",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 133,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "example-sub" : {
      "markDeletePosition" : "133:-1",
      "readPosition" : "133:14",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 0,
      "cursorLedger" : 135,
      "cursorLedgerLastEntry" : 0,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-04-08T10:13:47.097+08:00",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 15,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  },
  "schemaLedgers" : [ {
    "ledgerId" : 134,
    "entries" : 1,
    "size" : 98,
    "offloaded" : false
  } ],
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false
  }
}

debug info: after adding new consumer, always readType='replay', and messagesForC=0. I think it's the reason why no message send. But I haven't found why these happened
image

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions