Skip to content

KAFKA-3522: Replace RecordConverter with TimestampedBytesStore#6204

Merged
mjsax merged 2 commits into
apache:trunkfrom
mjsax:kafka-3522-rocksdb-format-add-timestampedbytestore
Jan 30, 2019
Merged

KAFKA-3522: Replace RecordConverter with TimestampedBytesStore#6204
mjsax merged 2 commits into
apache:trunkfrom
mjsax:kafka-3522-rocksdb-format-add-timestampedbytestore

Conversation

@mjsax

@mjsax mjsax commented Jan 26, 2019

Copy link
Copy Markdown
Member

This is a change to KIP-258 as announced on the mailing list.

  • replace public interface RecordConverter with TimestampedBytesStore
  • extract value oldFormat -> newFormat conversion into static method
  • adds a new internal RecordConverter that uses new static method

@mjsax mjsax added the streams label Jan 26, 2019
@mjsax

mjsax commented Jan 26, 2019

Copy link
Copy Markdown
Member Author

Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman

@vvcephei vvcephei left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mjsax ; This looks very clean. Thank you!

@bbejeck bbejeck left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @mjsax LGTM.

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a single comment about the code hierarchy, otherwise lgtm.

store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I thought the restorer / state manager would now be agnostic to the convert at all, since the inner store impl is responsible for extending the interface as well as calling its TimestampedBytesStore function internally. Why do we still need an internal RecordConverter class?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new static method that is used by the stores, will translate on-disk data from old to new format.

We still need a RecordConverter that translates data from the changelog topic to the new format, because on restore, we put plain <byte[],byte[]> key-value pairs into the store, and the store expects those record to be in the new format.

This make we realize, the the implementation of RecordConverter is actually not correct in this PR -- it should not use the new static method (that inserts a -1 as timestamp), but it need to put the actual record timestamp... Will update the PR accordingly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But can't we make this logic inside the prepareBatch call inside the internal store impl (of course we would require customized users to do so as well) so that the callers do not need to be aware of that?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because prepareBatch (or actually restoreAllInternal) takes KeyValue<byte[],byte[] but not a ConsumerRecord. Thus there is not timestamp information we can add to the value.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 The convertToTimestampedFormat function is purely for converting the already-stored data into the new binary format. It can't actually add any timestamp information, because we don't have it when we retrieve the old-format data. So the function is like (value) -> [-1,value].

In contrast, the state restorer needs to be able to insert the timestamp, so the function is like (value, timestamp) -> [timestamp, value].

Actually, looking at this code again, I see Matthias was right, the RecordConverter erroneously calls through to the convertToTimestampedFormat function. Oops! Good catch!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I just refreshed, and looked at the new code. It LGTM. Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Thanks for the explanation! LGTM.

@mjsax

mjsax commented Jan 29, 2019

Copy link
Copy Markdown
Member Author

Retest this please.

@mjsax

mjsax commented Jan 29, 2019

Copy link
Copy Markdown
Member Author
java.lang.ArrayIndexOutOfBoundsException: -1
	at kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
	at kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
	at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)

Retest this please.

@mjsax

mjsax commented Jan 30, 2019

Copy link
Copy Markdown
Member Author
java.lang.AssertionError: Too many quotaLimit calls Map(PRODUCE -> 1, FETCH -> 1, REQUEST -> 3)
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:105)
java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have been throttled
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229)
	at kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215)
	at kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82)
java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have been throttled
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229)
	at kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215)
	at kafka.api.BaseQuotaTest.testQuotaOverrideDelete(BaseQuotaTest.scala:124)

\cc @ijuma Build is quite unstable. Can we look into this: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2048/

Retest this please.

@mjsax

mjsax commented Jan 30, 2019

Copy link
Copy Markdown
Member Author
java.lang.AssertionError: Condition not met within timeout 15000. Streams never started.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
	at org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:254)

and

java.lang.AssertionError: Condition not met within timeout 15000. Streams never started.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
	at org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:549)

@guozhangwang Java11 passed. Run before Java8 passed. I tried retesting multiple times but test a flaky. Do you think it's ok to merge?

@guozhangwang

Copy link
Copy Markdown
Contributor

@guozhangwang Java11 passed. Run before Java8 passed. I tried retesting multiple times but test a flaky. Do you think it's ok to merge?

LGTM.

@mjsax mjsax merged commit 201022d into apache:trunk Jan 30, 2019
@mjsax mjsax deleted the kafka-3522-rocksdb-format-add-timestampedbytestore branch January 30, 2019 23:57
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk:
  fix typo (apache#5150)
  MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887)
  KAFKA-7766: Fail fast PR builds (apache#6059)
  KAFKA-7798: Expose embedded clientIds (apache#6107)
  KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163)
  KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377)
  MINOR: Fix some field definitions for ListOffsetReponse (apache#6214)
  KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203)
  KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022)
  MINOR: fix checkstyle suppressions for generated RPC code to work on Windows
  KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188)
  KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161)
  KAFKA-3522: Add RocksDBTimestampedStore (apache#6149)
  KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…e#6204)

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants