Skip to content

PIP-236: Record schema in the request and carry to the broker when subscribing with AUTO_CONSUME schema. #19113

@Denovo1998

Description

@Denovo1998

Motivation

Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as IncompatibleSchemaException("Topic does not have schema to check").

final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;

if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}

@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
int numActiveConsumers = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumers != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
});
}

@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema().thenCompose((hasSchema) -> {
int numActiveConsumers = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumers != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
}
});
}

We should record whether the active consumers of the Topic have one or more consumers whose SchemaType is not AUTO_CONSUME.

Goal

  1. On the client side, we add AutoConsume enum in the Schema.Type, and add protocol version in the ProtocolVersion.
  2. On the broker side, the SchemaType containing AUTO_CONSUME is recorded in org.apache.pulsar.broker.service.Consumer.

API Changes

Protocal change: Schema.Type and ProtocolVersion

message Schema {
    enum Type {
        AutoConsume = 21;
    }
}

enum ProtocolVersion {
	v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version
}

Record SchemaType in Consumer

    @Getter
    private final SchemaType schemaType;

Implementation

On the client side

  1. Set "default" schemaInfo(schemaType=SchemaType.AUTO_CONSUME) in org.apache.pulsar.client.impl.schema.AutoConsumeSchema.
  2. Get and set SchemaInfo with AUTO_CONSUME schema in org.apache.pulsar.common.protocol.Commands#newSubscribe when schemaType is AUTO_CONSUME and proto version is greater than or equal to v21.

On the broker side

  1. When the schema is not null and schemaType not AUTO_CONSUME, then addSchemaIfIdleOrCheckCompatible.
  2. Get the schemaType and record it in Consumer.
  3. The active consumers schema of the Topic have one or more consumers schema whose not AUTO_CONSUME. Then checkSchemaCompatibleForConsumer.
    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return hasSchema().thenCompose((hasSchema) -> {
            int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
                    .mapToInt(subscription -> subscription.getConsumers().stream()
                            .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
                            .toList().size())
                    .sum();
            if (hasSchema
                    || (!producers.isEmpty())
                    || (numActiveConsumersWithoutAutoSchema != 0)
                    || (ledger.getTotalSize() != 0)) {
                return checkSchemaCompatibleForConsumer(schema);
            } else {
                return addSchema(schema).thenCompose(schemaVersion ->
                        CompletableFuture.completedFuture(null));
            }
        });
    }

Alternatives

  1. On the client side, we add an optional field in the CommandSubscribe.

    add optional bool check_schema_compatibility = 20 [default = true];

    message CommandSubscribe {
        optional bool check_schema_compatibility = 20 [default = true];
    }
  2. On the broker side, record checkSchemaCompatibility in org.apache.pulsar.broker.service.Consumer.

  3. If checkSchemaCompatibility is true, schema compatibility check is required. If schemaType is AUTO_CONSUME, the checkSchemaCompatibility is false.

Anything else?

No response

Links

Discussion: https://lists.apache.org/thread/v7p88h7grqnbzocw34g6jvxjfw962kfd
Vote: https://lists.apache.org/thread/pvcdlbflofoj41ryo1lrn0zlhj15bwpv
PR: #17449

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions