Skip to content

PIP-166: Function add MANUAL delivery semantics  #15560

@shibd

Description

@shibd

Discussion thread: https://lists.apache.org/thread/4f2w1mqvhhs3mvccbcg2sk19b60xwkgf
Vote thread: https://lists.apache.org/thread/1ojcc12sxd87nz49yrflk8jv2nk98hvr (Pass)

Motivation

Currently, Function supports three delivery semantics and also provides autoAck to control whether to automatically ack.
Because autoAck affects the delivery semantics of Function, it can be confusing for users to understand the relationship between these two parameters.

For example, when the user configures Guarantees == ATMOST_ONCE and autoAck == false, then the framework will not help the user to ack messages, and the processing semantics may become ATLEAST_ONCE.

The delivery semantics provided by Function should be clear. When the user sets the guarantees, the framework should ensure point-to-point semantic processing and cannot be affected by other parameters.

Goal

Add MANUAL delivery semantics and delete autoAck config.

The original intention of autoAck semantics is that users want to control the timing of ack by themselves. When autoAck == false, the processing semantics provided by the framework should be invalid. Then we can add MANUAL processing semantics to replace the autoAck == false scenario.

When the user configuration ProcessingGuarantees == MANUAL , the framework does not help the user to do any ack operations, and the ack is left to the user to handle. In other cases, the framework guarantees processing semantics.

The processing logic of all semantics is:

  • ATMOST_ONCE: When the message is read by the client, it is immediately acknowledged, and only then the function is executed, thus guaranteeing it will not run more than once.
  • ATLEAST_ONCE: The message is acknowledged after the function finished execution, thus it will be run at least once.
  • EFFECTIVELY_ONCE: The message is acknowledged after the function finished execution. Depends on pulsar deduplication, and provides end-to-end effectively once processing.
  • MANUAL: The function framework does not do the ack operation, it is handled by the user inside the function.

API Changes

  1. Add MANUAL type to ProcessingGuarantees.
    (and also Function.proto)
public enum ProcessingGuarantees {
      ATLEAST_ONCE, 
      ATMOST_ONCE, 
      EFFECTIVELY_ONCE, 
      MANUAL 
}
  1. Indication of autoAck is deprecated, and not use it in the code.
    (and also Function.proto)
public class FunctionConfig {
     @Deprecated
     private Boolean autoAck;
}

I would issue a WARN when reading configuring the function (thus emitted once) when the user actively configured autoAck=false and warn them that this configuration is deprecated and they should switch to the MANUAL ProcessingGuarantee configuration option.

  1. Add new PulsarSinkProcessor implements: PulsarSinkManualProcessor. PulsarSinkManualProcessor do not do any ack operation.
        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
        switch (processingGuarantees) {
+          case MANUAL:
+             this.pulsarSinkProcessor = new PulsarSinkManualProcessor(schema, crypto);
+             break;
            case ATMOST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema, crypto);
                break;
            case ATLEAST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema, crypto);
                break;
            case EFFECTIVELY_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema, crypto);
                break;
        }

Implementation

  1. When the delivery semantic is ATMOST_ONCE, add verify autoAck must be true. If the validation fails, let the function fail to start (This temporarily resolves the configuration ambiguity). When autoAck is subsequently removed, the message will be acked immediately after receiving the message.

if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
currentRecord.ack();
}
}

  1. When user call record.ack() in function, just ProcessingGuarantees == MANUAL can be work. In turn, when ProcessingGuarantees != MANUAL, user call record.ack() is invalid(print warn log once).

  2. Add documentation that autoAck will be deprecated, and explain the MANUAL semantics and when record.ack() takes effect.

  3. For the windows function, the implementation of the currently delivery guarantee is problematic. Currently only support ATMOST_ONCE and EFFECTIVELY_ONCE guarantees. Because the message has been asked before sending output topic.

As follows code, onExpiry precedes onActivation.

public void onExpiry(List<Event<Record<T>>> events) {
for (Event<Record<T>> event : events) {
event.getRecord().ack();
}
}
@Override
public void onActivation(List<Event<Record<T>>> tuples, List<Event<Record<T>>> newTuples,
List<Event<Record<T>>>
expiredTuples, Long referenceTime) {
processWindow(
context,
tuples.stream().map(event -> event.get()).collect(Collectors.toList()),
newTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
expiredTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
referenceTime);
}
};

So, treat windows function as special function. Override delivery semantics in FunctionConfig as MANUAL, add guarantee configuration in Windows, then handle the ack timing according to this configuration inside Function Windows.

FunctionConfigUtils#convert

        if (windowConfig != null) {
            // Windows Function not support EFFECTIVELY_ONCE
            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new IllegalArgumentException(
                        "Windows Function not support EFFECTIVELY_ONCE Guarantees.");
            } else {
                // Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
                windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees
                        .valueOf(functionDetailsBuilder.getProcessingGuarantees().name()));
                functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL);
            }
         }

Plan test

  1. The main test and assert is that when ProcessingGuarantees == MANUAL, the function framework will not do any ack operations for the user.
  2. Validate the test of autoAck=false still works (you haven't broken anything)
  3. Validate existing ProcessingGuarantee test for AtMostOnce, AtLeastOnce, ExactlyOnce still works (when autoAck=true)

Compatibility

  1. This change will indicate of autoAck is deprecated, and check is not used in the code. And document clearly it's deprecated for the following 2~3 release. And then ignore it.
  2. Runtimes of other languages ​​need to maintain consistent processing logic (python, go).

Since our changes are backward compatible before the change is released, we can open multiple PR to iteratively implement the runtime of each language. When all languages ​​are supported, publish documentation to inform users.

Incompatible case

  1. When the user configures Guarantees == ATMOST_ONCE and autoAck == false.
  • Current implementation: Function can be successfully started and the framework does not help user autoAck message. And the processing semantics may become ATLEAST_ONCE.
  • Changed implementation: Function failed to start, and printed error response to the user.
  1. If user set autoAck == true and call record.ack() in function.
  • Current implementation: Can be work, But will ack the message multiple times.
  • Changed implementation: Can be work, But just when Guarantees == MANUAL take effect, In other cases, the warning log will be printed.

An additional explanation is required, these incompatible cases can all be considered bugs of the current implementation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions