Discussion thread: https://lists.apache.org/thread/4f2w1mqvhhs3mvccbcg2sk19b60xwkgf
Vote thread: https://lists.apache.org/thread/1ojcc12sxd87nz49yrflk8jv2nk98hvr (Pass)
Motivation
Currently, Function supports three delivery semantics and also provides autoAck to control whether to automatically ack.
Because autoAck affects the delivery semantics of Function, it can be confusing for users to understand the relationship between these two parameters.
For example, when the user configures Guarantees == ATMOST_ONCE and autoAck == false, then the framework will not help the user to ack messages, and the processing semantics may become ATLEAST_ONCE.
The delivery semantics provided by Function should be clear. When the user sets the guarantees, the framework should ensure point-to-point semantic processing and cannot be affected by other parameters.
Goal
Add MANUAL delivery semantics and delete autoAck config.
The original intention of autoAck semantics is that users want to control the timing of ack by themselves. When autoAck == false, the processing semantics provided by the framework should be invalid. Then we can add MANUAL processing semantics to replace the autoAck == false scenario.
When the user configuration ProcessingGuarantees == MANUAL , the framework does not help the user to do any ack operations, and the ack is left to the user to handle. In other cases, the framework guarantees processing semantics.
The processing logic of all semantics is:
ATMOST_ONCE: When the message is read by the client, it is immediately acknowledged, and only then the function is executed, thus guaranteeing it will not run more than once.
ATLEAST_ONCE: The message is acknowledged after the function finished execution, thus it will be run at least once.
EFFECTIVELY_ONCE: The message is acknowledged after the function finished execution. Depends on pulsar deduplication, and provides end-to-end effectively once processing.
MANUAL: The function framework does not do the ack operation, it is handled by the user inside the function.
API Changes
- Add
MANUAL type to ProcessingGuarantees.
(and also Function.proto)
public enum ProcessingGuarantees {
ATLEAST_ONCE,
ATMOST_ONCE,
EFFECTIVELY_ONCE,
MANUAL
}
- Indication of autoAck is deprecated, and not use it in the code.
(and also Function.proto)
public class FunctionConfig {
@Deprecated
private Boolean autoAck;
}
I would issue a WARN when reading configuring the function (thus emitted once) when the user actively configured autoAck=false and warn them that this configuration is deprecated and they should switch to the MANUAL ProcessingGuarantee configuration option.
- Add new
PulsarSinkProcessor implements: PulsarSinkManualProcessor. PulsarSinkManualProcessor do not do any ack operation.
FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
switch (processingGuarantees) {
+ case MANUAL:
+ this.pulsarSinkProcessor = new PulsarSinkManualProcessor(schema, crypto);
+ break;
case ATMOST_ONCE:
this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema, crypto);
break;
case ATLEAST_ONCE:
this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema, crypto);
break;
case EFFECTIVELY_ONCE:
this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema, crypto);
break;
}
Implementation
- When the delivery semantic is
ATMOST_ONCE, add verify autoAck must be true. If the validation fails, let the function fail to start (This temporarily resolves the configuration ambiguity). When autoAck is subsequently removed, the message will be acked immediately after receiving the message.
|
if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions |
|
.proto.Function.ProcessingGuarantees.ATMOST_ONCE) { |
|
if (instanceConfig.getFunctionDetails().getAutoAck()) { |
|
currentRecord.ack(); |
|
} |
|
} |
-
When user call record.ack() in function, just ProcessingGuarantees == MANUAL can be work. In turn, when ProcessingGuarantees != MANUAL, user call record.ack() is invalid(print warn log once).
-
Add documentation that autoAck will be deprecated, and explain the MANUAL semantics and when record.ack() takes effect.
-
For the windows function, the implementation of the currently delivery guarantee is problematic. Currently only support ATMOST_ONCE and EFFECTIVELY_ONCE guarantees. Because the message has been asked before sending output topic.
As follows code, onExpiry precedes onActivation.
|
public void onExpiry(List<Event<Record<T>>> events) { |
|
for (Event<Record<T>> event : events) { |
|
event.getRecord().ack(); |
|
} |
|
} |
|
|
|
@Override |
|
public void onActivation(List<Event<Record<T>>> tuples, List<Event<Record<T>>> newTuples, |
|
List<Event<Record<T>>> |
|
expiredTuples, Long referenceTime) { |
|
processWindow( |
|
context, |
|
tuples.stream().map(event -> event.get()).collect(Collectors.toList()), |
|
newTuples.stream().map(event -> event.get()).collect(Collectors.toList()), |
|
expiredTuples.stream().map(event -> event.get()).collect(Collectors.toList()), |
|
referenceTime); |
|
} |
|
}; |
So, treat windows function as special function. Override delivery semantics in FunctionConfig as MANUAL, add guarantee configuration in Windows, then handle the ack timing according to this configuration inside Function Windows.
FunctionConfigUtils#convert
if (windowConfig != null) {
// Windows Function not support EFFECTIVELY_ONCE
if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException(
"Windows Function not support EFFECTIVELY_ONCE Guarantees.");
} else {
// Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees
.valueOf(functionDetailsBuilder.getProcessingGuarantees().name()));
functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL);
}
}
Plan test
- The main test and assert is that when ProcessingGuarantees == MANUAL, the function framework will not do any ack operations for the user.
- Validate the test of autoAck=false still works (you haven't broken anything)
- Validate existing ProcessingGuarantee test for AtMostOnce, AtLeastOnce, ExactlyOnce still works (when autoAck=true)
Compatibility
- This change will indicate of autoAck is deprecated, and check is not used in the code. And document clearly it's deprecated for the following 2~3 release. And then ignore it.
- Runtimes of other languages need to maintain consistent processing logic (python, go).
Since our changes are backward compatible before the change is released, we can open multiple PR to iteratively implement the runtime of each language. When all languages are supported, publish documentation to inform users.
Incompatible case
- When the user configures
Guarantees == ATMOST_ONCE and autoAck == false.
- Current implementation: Function can be successfully started and the framework does not help user autoAck message. And the processing semantics may become
ATLEAST_ONCE.
- Changed implementation: Function failed to start, and printed error response to the user.
- If user set
autoAck == true and call record.ack() in function.
- Current implementation: Can be work, But will ack the message multiple times.
- Changed implementation: Can be work, But just when
Guarantees == MANUAL take effect, In other cases, the warning log will be printed.
An additional explanation is required, these incompatible cases can all be considered bugs of the current implementation.
Discussion thread: https://lists.apache.org/thread/4f2w1mqvhhs3mvccbcg2sk19b60xwkgf
Vote thread: https://lists.apache.org/thread/1ojcc12sxd87nz49yrflk8jv2nk98hvr (Pass)
Motivation
Currently, Function supports three delivery semantics and also provides autoAck to control whether to automatically ack.
Because autoAck affects the delivery semantics of Function, it can be confusing for users to understand the relationship between these two parameters.
For example, when the user configures
Guarantees == ATMOST_ONCEandautoAck == false, then the framework will not help the user to ack messages, and the processing semantics may becomeATLEAST_ONCE.The delivery semantics provided by Function should be clear. When the user sets the guarantees, the framework should ensure point-to-point semantic processing and cannot be affected by other parameters.
Goal
Add
MANUALdelivery semantics and deleteautoAckconfig.The original intention of
autoAcksemantics is that users want to control the timing of ack by themselves. When autoAck == false, the processing semantics provided by the framework should be invalid. Then we can addMANUALprocessing semantics to replace the autoAck == false scenario.When the user configuration
ProcessingGuarantees == MANUAL, the framework does not help the user to do any ack operations, and the ack is left to the user to handle. In other cases, the framework guarantees processing semantics.The processing logic of all semantics is:
ATMOST_ONCE: When the message is read by the client, it is immediately acknowledged, and only then the function is executed, thus guaranteeing it will not run more than once.ATLEAST_ONCE: The message is acknowledged after the function finished execution, thus it will be run at least once.EFFECTIVELY_ONCE: The message is acknowledged after the function finished execution. Depends on pulsar deduplication, and provides end-to-end effectively once processing.MANUAL: The function framework does not do the ack operation, it is handled by the user inside the function.API Changes
MANUALtype to ProcessingGuarantees.(and also Function.proto)
(and also Function.proto)
I would issue a WARN when reading configuring the function (thus emitted once) when the user actively configured
autoAck=falseand warn them that this configuration is deprecated and they should switch to the MANUAL ProcessingGuarantee configuration option.PulsarSinkProcessorimplements:PulsarSinkManualProcessor.PulsarSinkManualProcessordo not do any ack operation.Implementation
ATMOST_ONCE, add verifyautoAckmust betrue. If the validation fails, let the function fail to start (This temporarily resolves the configuration ambiguity). When autoAck is subsequently removed, the message will be acked immediately after receiving the message.pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
Lines 271 to 276 in c49a977
When user call
record.ack()in function, justProcessingGuarantees == MANUALcan be work. In turn, whenProcessingGuarantees != MANUAL, user callrecord.ack()is invalid(print warn log once).Add documentation that autoAck will be deprecated, and explain the
MANUALsemantics and whenrecord.ack()takes effect.For the windows function, the implementation of the currently delivery guarantee is problematic. Currently only support
ATMOST_ONCEandEFFECTIVELY_ONCEguarantees. Because the message has been asked before sending output topic.As follows code,
onExpiryprecedesonActivation.pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
Lines 205 to 222 in eeb22ba
So, treat windows function as special function. Override delivery semantics in
FunctionConfigasMANUAL, add guarantee configuration in Windows, then handle the ack timing according to this configuration insideFunction Windows.FunctionConfigUtils#convert
Plan test
Compatibility
Incompatible case
Guarantees == ATMOST_ONCEandautoAck == false.ATLEAST_ONCE.autoAck == trueand callrecord.ack()in function.Guarantees == MANUALtake effect, In other cases, the warning log will be printed.An additional explanation is required, these incompatible cases can all be considered bugs of the current implementation.