-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Motivation
In Pulsar, a subscription maintains a state of message acknowledged. A subscription backlog is the set of messages which are unacknowledged.
A subscription backlog size is the sum of size of unacknowledged messages(in bytes).
A topic can have many subscriptions.
A topic backlog is defined as the backlog size of the subscription which has the oldest unacknowledged message. Since acknowledged messages can be interleaved with unacknowledged messages, calculating the exact size of that subscription can be expensive as it requires I/O operations to read from the messages from the ledgers.
For that reason, the topic backlog is actually defined to be the estimated backlog size of that subscription. It does so by summarizing the size of all the ledgers, starting from the current active one, up to the ledger which contains the oldest unacknowledged message (There is actually a faster way to calculate it, but this is the definition of the estimation).
A topic backlog age is the age of the oldest unacknowledged message (in any
subscription). If that message was written 30 minutes ago, its age is 30
minutes.
Pulsar has a feature called backlog quota (place link). It allows the user to define a quota - in effect, a limit - which limits the topic backlog.
There are two types of quotas:
- Size based: The limit is for the topic backlog size (as we defined above).
- Time based: The limit is for the topic's backlog age (as we defined
above).
Once a topic backlog exceeds either one of those limits, an action is taken upon messages written to the topic:
- The producer write is placed on hold for a certain amount of time before failing.
- The producer write is failed
- The subscriptions oldest unacknowledged messages will be acknowledged in order until both the topic backlog size or age will fall inside the limit(quota). The process is called backlog eviction (happens every interval)
The quotas can be defined as a default value for any topic, by using the following broker configuration keys:
- backlogQuotaDefaultLimitBytes ,
- backlogQuotaDefaultLimitSecond.
It can also be specified directly for all topics in a given namespace using the namespace policy, or a specific topic using a topic policy.
The user today can calculate quota used for size based limit, since there
are two metrics that are exposed today on a topic level: "pulsar_storage_backlog_quota_limit" and "pulsar_storage_backlog_size". You can just divide the two to get a percentage.
For the time-based limit, the only metric exposed today is quota itself , "pulsar_storage_backlog_quota_limit_time".
For the purpose of indicate users backlog quota eviction is going to happen and how many it happens on a topic, we have to introduce new metrics.
Goal
- Expose
pulsar_storage_backlog_age_secondsin the topic level - Expose
pulsar_storage_backlog_eviction_countin the topic level - Expose
backlogQuotaLimitSizeandbacklogQuotaLimitTimein the PulsarAdmin.
API Changes
1. Topics#getStats Add the following fields:
- backlogQuotaSizeBytes: Indicate the backlog quota limit size(in bytes) of the topic
- backlogQuotaTimeSeconds: Indicate the backlog quota limit time(in seconds) of the topic
Implementation
1. pulsar_storage_backlog_age_seconds
checkTimeBacklogExceeded is run periodically, we can add a field in PersistentTopic to cache the result to avoid additional costs
preciseTimeBasedBacklogQuotaCheck= true
After readEntryComplete, cache its result:
public void readEntryComplete(Entry entry, Object ctx) {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
// cache result
backlogAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - entryTimestamp);
// ignore other logics ....
}
preciseTimeBasedBacklogQuotaCheck= false
slowestReaderTimeBasedBacklogQuotaCheckis a totally in-memory method, we just need to cache the ManangedLedger's age.
2. pulsar_storage_backlog_eviction_count
Record the event when dropBacklogForSizeLimit or dropBacklogForTimeLimit is going to invoked.
3. New Metric
| Name | Type | Description | Labels |
|---|---|---|---|
| pulsar_storage_backlog_age_seconds | Gauge | The age (time passed since it was published) of the earliest unacknowledged message based on the topic's existing subscriptions. | cluster, tenant, namespace, topic |
| pulsar_storage_backlog_eviction_count | Number of times backlog evictions happened due to exceeding backlog quota | ||
| (either time or size). | cluster, tenant, namespace, topic |
4. Existing Metrics
| Name | Type | Description |
|---|---|---|
| pulsar_storage_backlog_quota_limit | Gauge | The total amount of the data in this topic that limit the backlog quota (bytes). |
| pulsar_storage_backlog_quota_limit_time | Gauge | The backlog quota limit in time(seconds). |
| pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this topic owned by this broker (in bytes). |
5. How to use
- Get a percentage
Math.max(pulsar_storage_backlog_size/pulsar_storage_backlog_quota_limit,pulsar_storage_backlog_age_seconds/pulsar_storage_backlog_quota_limit_time). - Set an alarm
If the percentage we got at the previous step exceeds a value(say, 50%, 80%), send the alarm. - Find the backlog subscriptions
After received the alarm, users could request Topics#getStats(topicName, true/false, true, true) to get the topic stats, and find which subscriptions are in backlog.
Pulsar exposedbacklogSizeandearliestMsgPublishTimeInBacklogin the subscription level, and we will exposebacklogQuotaSizeBytesandbacklogQuotaTimeSecondsin the topic level, so users could find which subscriptions in backlog easily.
Alternatives
No response
Anything else?
No response