-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
#17221 describes an environment when multiple bookie copies are corrupted, or a Ledger has been deleted. The loss of schema ledger results in new producers and consumers not even being created and working properly.
According to the solution of PR #18010, enable autoSkipNonRecoverableData and skip has gotten lost schema can lead to the schema information is not complete. And in the existing code, schema corruption will delete the metadata.
Lines 564 to 570 in a953027
| // clean up the broken schema from zk | |
| deleteSchemaStorage(schemaId, true).handle((sv, th) -> { | |
| log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", rc.getMessage(), | |
| schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage())); | |
| schemaResult.complete(list); | |
| return null; | |
| }); |
If an error is not recoverable will be deleted, but PR #18010 and #19882 has been maked
NoSuchLedgerExistsOnMetadataServerException also as recoverable exception.
So we need a solution that does not just skip the schema with the missing ledger, but actually supplements the broken schema ledger.
Solution
A new method called tryCompleteTheLostSchemaLedger. When the schema ledger losted, if the new consumer subscription or a new producer created, when there is a "Failed to open gotten" such an error, call tryCompleteTheLostSchemaLedger method.
CompletableFuture<Long> tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema);This method attempts to create a new ledger save schemaData and then update the new ledger id to the metadata.
Now, connected producers and consumers can work even if scheme ledger is deleted. To get the SchemaData, we need to store the SchemaData and SchemaVersion information in the topic(org.apache.pulsar.broker.service.AbstractTopic). When calling tryCompleteTheLostSchemaLedger incoming.
Alternatives
- In the broker,
org.apache.pulsar.broker.service.Producerandorg.apache.pulsar.broker.service.Consumerdo not save SchemaData and SchemaVersion, and only calltryCompleteTheLostSchemaLedgerthrough the admin api. Perhaps we should directly implement this function on the upload schema function(https://pulsar.apache.org/docs/3.2.x/admin-api-schemas/#upload-a-schema), then we need to pass in an additional flag to identify whether to register or make up for the missing schema. Of course, for compatibility, the default behavior should be to register a new schema. - The corresponding schema information is also saved on the Client side. Perhaps the broker side can initiate a request for each connecting consumer or producer to obtain schema information?(Now, connected producers and consumers can work even if scheme ledger is deleted.) In this way, schema information does not need to be cached on the broker side.
Store the SchemaData and SchemaVersion information in theorg.apache.pulsar.broker.service.Producerandorg.apache.pulsar.broker.service.Consumerthat are connected or subscribed to the topic on the broker side.(Not an overall alternative, only contains how to storeSchemaDataandSchemaVersionthat have been lost)
Anything else?
Please pay attention to the alternatives and leave your ideas for discussion. I will modify the implementation in pr.
Are you willing to submit a PR?
- I'm willing to submit a PR!