Skip to content

PIP-248: Add metrics for backlog quota event #19601

@dao-jun

Description

@dao-jun

Motivation

In Pulsar, a subscription maintains a state of message acknowledged. A subscription backlog is the set of messages which are unacknowledged.

A subscription backlog size is the sum of size of unacknowledged messages(in bytes).

A topic can have many subscriptions.

A topic backlog is defined as the backlog size of the subscription which has the oldest unacknowledged message. Since acknowledged messages can be interleaved with unacknowledged messages, calculating the exact size of that subscription can be expensive as it requires I/O operations to read from the messages from the ledgers.

For that reason, the topic backlog is actually defined to be the estimated backlog size of that subscription. It does so by summarizing the size of all the ledgers, starting from the current active one, up to the ledger which contains the oldest unacknowledged message (There is actually a faster way to calculate it, but this is the definition of the estimation).

A topic backlog age is the age of the oldest unacknowledged message (in any
subscription). If that message was written 30 minutes ago, its age is 30
minutes.

Pulsar has a feature called backlog quota (place link). It allows the user to define a quota - in effect, a limit - which limits the topic backlog.
There are two types of quotas:

  • Size based: The limit is for the topic backlog size (as we defined above).
  • Time based: The limit is for the topic's backlog age (as we defined
    above).

Once a topic backlog exceeds either one of those limits, an action is taken upon messages written to the topic:

  • The producer write is placed on hold for a certain amount of time before failing.
  • The producer write is failed
  • The subscriptions oldest unacknowledged messages will be acknowledged in order until both the topic backlog size or age will fall inside the limit(quota). The process is called backlog eviction (happens every interval)

The quotas can be defined as a default value for any topic, by using the following broker configuration keys:

  • backlogQuotaDefaultLimitBytes ,
  • backlogQuotaDefaultLimitSecond.
    It can also be specified directly for all topics in a given namespace using the namespace policy, or a specific topic using a topic policy.

The user today can calculate quota used for size based limit, since there
are two metrics that are exposed today on a topic level: "pulsar_storage_backlog_quota_limit" and "pulsar_storage_backlog_size". You can just divide the two to get a percentage.
For the time-based limit, the only metric exposed today is quota itself , "pulsar_storage_backlog_quota_limit_time".

For the purpose of indicate users backlog quota eviction is going to happen and how many it happens on a topic, we have to introduce new metrics.

Goal

  1. Expose pulsar_storage_backlog_age_seconds in the topic level
  2. Expose pulsar_storage_backlog_eviction_count in the topic level
  3. Expose backlogQuotaLimitSize and backlogQuotaLimitTime in the PulsarAdmin.

API Changes

1. Topics#getStats Add the following fields:

  1. backlogQuotaSizeBytes: Indicate the backlog quota limit size(in bytes) of the topic
  2. backlogQuotaTimeSeconds: Indicate the backlog quota limit time(in seconds) of the topic

Implementation

1. pulsar_storage_backlog_age_seconds

checkTimeBacklogExceeded is run periodically, we can add a field in PersistentTopic to cache the result to avoid additional costs

  1. preciseTimeBasedBacklogQuotaCheck = true
    After readEntryComplete, cache its result:
                        public void readEntryComplete(Entry entry, Object ctx) {
                                long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
                                // cache result
                                backlogAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - entryTimestamp);
                              // ignore other logics ....
                        }
  1. preciseTimeBasedBacklogQuotaCheck = false
    slowestReaderTimeBasedBacklogQuotaCheck is a totally in-memory method, we just need to cache the ManangedLedger's age.

2. pulsar_storage_backlog_eviction_count

Record the event when dropBacklogForSizeLimit or dropBacklogForTimeLimit is going to invoked.

3. New Metric

Name Type Description Labels
pulsar_storage_backlog_age_seconds Gauge The age (time passed since it was published) of the earliest unacknowledged message based on the topic's existing subscriptions. cluster, tenant, namespace, topic
pulsar_storage_backlog_eviction_count Number of times backlog evictions happened due to exceeding backlog quota
(either time or size). cluster, tenant, namespace, topic

4. Existing Metrics

Name Type Description
pulsar_storage_backlog_quota_limit Gauge The total amount of the data in this topic that limit the backlog quota (bytes).
pulsar_storage_backlog_quota_limit_time Gauge The backlog quota limit in time(seconds).
pulsar_storage_backlog_size Gauge The total backlog size of the topics of this topic owned by this broker (in bytes).

5. How to use

  1. Get a percentage
    Math.max(pulsar_storage_backlog_size / pulsar_storage_backlog_quota_limit, pulsar_storage_backlog_age_seconds / pulsar_storage_backlog_quota_limit_time).
  2. Set an alarm
    If the percentage we got at the previous step exceeds a value(say, 50%, 80%), send the alarm.
  3. Find the backlog subscriptions
    After received the alarm, users could request Topics#getStats(topicName, true/false, true, true) to get the topic stats, and find which subscriptions are in backlog.
    Pulsar exposed backlogSize and earliestMsgPublishTimeInBacklog in the subscription level, and we will expose backlogQuotaSizeBytes and backlogQuotaTimeSeconds in the topic level, so users could find which subscriptions in backlog easily.

Alternatives

No response

Anything else?

No response

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions