Skip to content

Pub/Sub Exactly Once Delivery #9327

@coryan

Description

@coryan

Objective

Give customers access to a new feature in Pub/Sub: exactly once delivery. This
feature guarantee that successfully acknowledged messages are not redelivered,
nor are messages redelivered before their ack deadline expires.

Background

Previously Pub/Sub guaranteed "at least once" delivery, which meant all ack/nack
messages were handled on a best-effort basis. Errors in ack/nack messages were
not exposed to the application, and no retry attempts are made on these RPCs.

Some documents refer to this feature as "Exactly Once Subscribe" (EOS). Others
use "Exactly Once Delivery" (EOD). Sigh.

The proto changes are deceptively small.

First, when a subscription is created it can be configured to use "exactly once
delivery":

Second, if you use the streaming API to ack and nack messages you get back
responses with the status of your nack/ack:

Note that all client libraries, including the C++ client libraries do not
use the streaming pull RPC to ack/nack messages. We use the Acknowledge() and
ModifyAckDeadline() RPCs. Which unfortunately can accept multiple ack ids,
but cannot return how was each ack id handled. The service has chosen to tunnel
this information via the ErrorInfo. It becomes part of the client library's
job to parse the ErrorInfo and deliver the errors to the application.

The C++ client library provides the following class so applications can ack/nack
messages:

class AckHandler {
 public:
  ~AckHandler();

  AckHandler(AckHandler&&) = default;
  AckHandler& operator=(AckHandler&&) = default;

  void ack() &&;
  void nack() &&;
  std::int32_t delivery_attempt() const;
}

Proposal

We will introduce a new class to ack/nack messages but with (possible) errors:

class ExactlyOnceAckHandler {
 public:
  ~ExactlyOnceAckHandler();

  ExactlyOnceAckHandler(ExactlyOnceAckHandler&&) = default;
  ExactlyOnceAckHandler& operator=(ExactlyOnceAckHandler&&) = default;

  future<Status> ack() &&;
  future<Status> nack() &&;
  std::int32_t delivery_attempt() const;
}

Note that ack() && and nack() && now return a future<Status>. See below
for how these futures will be satisfied.

To select with *AckHandler to use we will change Subscriber
(and SubscriberConnection) from:

class Subscriber {
 public:
  template <typename Callback>
  future<Status> Subscribe(Callback&& callback) {
    std::function<void(Message,AckHandler)> cb(std::forward<Callback>(callback));
    return connection_->Subscribe({std::move(cb)});
  }

to:

class Subscriber {
 public:
  future<Status> Subscribe(std::function<void(Message,AckHandler)> cb) {
    return connection_->Subscribe({std::move(cb)});
  }
  future<Status> Subscribe(std::function<void(Message,ExactlyOnceAckHandler)> cb) {
    return connection_->ExactlyOnceSubscribe({std::move(cb)});
  }

The changes to SubscriberConnection are left as an exercise to the reader.

The futures returned by ack() && and nack() && will be satisfied when:

  • The ack/nack message is successfully delivered
  • The ack/nack message fails, and the tunneled error starts with PERMANENT_ERROR. The
    Status will have StatusCode::kUnknown. The error message will match the
    tunneled string, and the ErrorInfo will match the last error info.
  • The retry attempts are exhausted. The Status will have
    StatusCode::kUnavailable, and the message will match the last tunneled
    string. The ErrorInfo will match the last error info.
  • The streaming pull is terminated. The error code will be
    StatusCode::kUnknown. The message will be free form, but a human would be
    able to glean what happened.

Breakdown

  • On streaming pulls, if the stream is configured for exactly once delivery, use the more generous (60 seconds) minimum deadline extension value.
  • One streaming pulls, if the exactly-once configuration changes, send a deadline extension with the configured value.
  • Change the pubsub_internal::StreamingBatchSource to return the errors,
    ignoring the errors in the existing AckHandler implementation.
  • If the stream has exactly-once enabled, then use a hard-coded retry policy to retry the ack/nack requests.
  • If the stream has exactly-once enabled, then use a hard-coded retry policy to update leases.
  • Parse the ErrorInfo result to implement these retry policies
  • If the stream does not have exactly-once enabled, then always
    succeed, the spec (internal go/cps-eos-client-libraries) says we
    should not retry.
  • Implement a new ExactlyOnceAckHandler::Impl set of classes, these are all in the pubsub_internal namespace.
  • Refactor AckHandler::Impl to use these classes with errors, but only logs these errors (and ignore failures).
  • Expose a new ExactlyOnceAckHandler class (and a mock).
  • Update Subscriber and SubscriberConnection and its mock.
  • Expose new options to control the minimum deadline extension value
  • If the application overrides the minimum deadline extension use this value instead.
  • Implement an example using exactly-once semantics.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions