-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Motivation
Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as IncompatibleSchemaException("Topic does not have schema to check").
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Line 1037 in ed33fb3
| final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; |
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1147 to 1152 in ed33fb3
| if (schema != null) { | |
| return topic.addSchemaIfIdleOrCheckCompatible(schema) | |
| .thenCompose(v -> topic.subscribe(option)); | |
| } else { | |
| return topic.subscribe(option); | |
| } |
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 3054 to 3071 in ed33fb3
| @Override | |
| public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) { | |
| return hasSchema() | |
| .thenCompose((hasSchema) -> { | |
| int numActiveConsumers = subscriptions.values().stream() | |
| .mapToInt(subscription -> subscription.getConsumers().size()) | |
| .sum(); | |
| if (hasSchema | |
| || (!producers.isEmpty()) | |
| || (numActiveConsumers != 0) | |
| || (ledger.getTotalSize() != 0)) { | |
| return checkSchemaCompatibleForConsumer(schema); | |
| } else { | |
| return addSchema(schema).thenCompose(schemaVersion -> | |
| CompletableFuture.completedFuture(null)); | |
| } | |
| }); | |
| } |
Lines 1162 to 1177 in ed33fb3
| @Override | |
| public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) { | |
| return hasSchema().thenCompose((hasSchema) -> { | |
| int numActiveConsumers = subscriptions.values().stream() | |
| .mapToInt(subscription -> subscription.getConsumers().size()) | |
| .sum(); | |
| if (hasSchema | |
| || (!producers.isEmpty()) | |
| || (numActiveConsumers != 0) | |
| || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) { | |
| return checkSchemaCompatibleForConsumer(schema); | |
| } else { | |
| return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); | |
| } | |
| }); | |
| } |
We should record whether the active consumers of the Topic have one or more consumers whose SchemaType is not AUTO_CONSUME.
Goal
- On the client side, we add AutoConsume enum in the Schema.Type, and add protocol version in the ProtocolVersion.
- On the broker side, the SchemaType containing AUTO_CONSUME is recorded in
org.apache.pulsar.broker.service.Consumer.
API Changes
Protocal change: Schema.Type and ProtocolVersion
message Schema {
enum Type {
AutoConsume = 21;
}
}
enum ProtocolVersion {
v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version
}Record SchemaType in Consumer
@Getter
private final SchemaType schemaType;Implementation
On the client side
- Set "default" schemaInfo(schemaType=SchemaType.AUTO_CONSUME) in
org.apache.pulsar.client.impl.schema.AutoConsumeSchema. - Get and set SchemaInfo with AUTO_CONSUME schema in
org.apache.pulsar.common.protocol.Commands#newSubscribewhen schemaType is AUTO_CONSUME and proto version is greater than or equal tov21.
On the broker side
- When the schema is not null and schemaType not AUTO_CONSUME, then addSchemaIfIdleOrCheckCompatible.
- Get the schemaType and record it in Consumer.
- The active consumers schema of the Topic have one or more consumers schema whose not AUTO_CONSUME. Then
checkSchemaCompatibleForConsumer.
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema().thenCompose((hasSchema) -> {
int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().stream()
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
.toList().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumersWithoutAutoSchema != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
});
}Alternatives
-
On the client side, we add an optional field in the CommandSubscribe.
add optional bool check_schema_compatibility = 20 [default = true];
message CommandSubscribe { optional bool check_schema_compatibility = 20 [default = true]; }
-
On the broker side, record checkSchemaCompatibility in
org.apache.pulsar.broker.service.Consumer. -
If checkSchemaCompatibility is true, schema compatibility check is required. If schemaType is AUTO_CONSUME, the checkSchemaCompatibility is false.
Anything else?
No response
Links
Discussion: https://lists.apache.org/thread/v7p88h7grqnbzocw34g6jvxjfw962kfd
Vote: https://lists.apache.org/thread/pvcdlbflofoj41ryo1lrn0zlhj15bwpv
PR: #17449