KAFKA-3522: Replace RecordConverter with TimestampedBytesStore#6204
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman |
guozhangwang
left a comment
There was a problem hiding this comment.
Left a single comment about the code hierarchy, otherwise lgtm.
| store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; | ||
| final RecordConverter recordConverter = | ||
| stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter(); | ||
| stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record; |
There was a problem hiding this comment.
Hmm.. I thought the restorer / state manager would now be agnostic to the convert at all, since the inner store impl is responsible for extending the interface as well as calling its TimestampedBytesStore function internally. Why do we still need an internal RecordConverter class?
There was a problem hiding this comment.
The new static method that is used by the stores, will translate on-disk data from old to new format.
We still need a RecordConverter that translates data from the changelog topic to the new format, because on restore, we put plain <byte[],byte[]> key-value pairs into the store, and the store expects those record to be in the new format.
This make we realize, the the implementation of RecordConverter is actually not correct in this PR -- it should not use the new static method (that inserts a -1 as timestamp), but it need to put the actual record timestamp... Will update the PR accordingly.
There was a problem hiding this comment.
But can't we make this logic inside the prepareBatch call inside the internal store impl (of course we would require customized users to do so as well) so that the callers do not need to be aware of that?
There was a problem hiding this comment.
No, because prepareBatch (or actually restoreAllInternal) takes KeyValue<byte[],byte[] but not a ConsumerRecord. Thus there is not timestamp information we can add to the value.
There was a problem hiding this comment.
👍 The convertToTimestampedFormat function is purely for converting the already-stored data into the new binary format. It can't actually add any timestamp information, because we don't have it when we retrieve the old-format data. So the function is like (value) -> [-1,value].
In contrast, the state restorer needs to be able to insert the timestamp, so the function is like (value, timestamp) -> [timestamp, value].
Actually, looking at this code again, I see Matthias was right, the RecordConverter erroneously calls through to the convertToTimestampedFormat function. Oops! Good catch!
There was a problem hiding this comment.
Ah, I just refreshed, and looked at the new code. It LGTM. Thanks!
|
Retest this please. |
Retest this please. |
\cc @ijuma Build is quite unstable. Can we look into this: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2048/ Retest this please. |
and @guozhangwang Java11 passed. Run before Java8 passed. I tried retesting multiple times but test a flaky. Do you think it's ok to merge? |
LGTM. |
* AK/trunk: fix typo (apache#5150) MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887) KAFKA-7766: Fail fast PR builds (apache#6059) KAFKA-7798: Expose embedded clientIds (apache#6107) KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163) KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377) MINOR: Fix some field definitions for ListOffsetReponse (apache#6214) KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203) KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022) MINOR: fix checkstyle suppressions for generated RPC code to work on Windows KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188) KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161) KAFKA-3522: Add RocksDBTimestampedStore (apache#6149) KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
…e#6204) Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This is a change to KIP-258 as announced on the mailing list.
RecordConverterwithTimestampedBytesStoreRecordConverterthat uses new static method