Skip to content

KAFKA-17872: Update consumed offsets on records with invalid timestamp#17710

Merged
mjsax merged 2 commits into
apache:trunkfrom
mjsax:kafka-17872-drop-ts-extractor
Nov 10, 2024
Merged

KAFKA-17872: Update consumed offsets on records with invalid timestamp#17710
mjsax merged 2 commits into
apache:trunkfrom
mjsax:kafka-17872-drop-ts-extractor

Conversation

@mjsax

@mjsax mjsax commented Nov 7, 2024

Copy link
Copy Markdown
Member

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

@mjsax mjsax added the streams label Nov 7, 2024
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
);
droppedRecordsSensor.record();
lastCorruptedRecord = raw;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix -- we track the record with invalid ts as "corrupted record" instead of dropping it completely. The CorrecuptedRecrods allows us to update the offset later, but would be dropped by itself and not be processed.


@SuppressWarnings("rawtypes")
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup to fix warnings

@Test
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final byte[] key = new LongSerializer().serialize("foo", 1L);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup

final AssertionError error = shouldNotSeek.get();
if (error != null) {
throw error;
try (final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();

task.addRecords(partition1, singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to change this to avoid a default ts which is negative and would break this test (same below)

when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
final long offset = 543L;
final long consumedOffset = 345L;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused -- removing

final String processingExceptionHandler) {
final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler,
final Class<? extends ProcessingExceptionHandler> processingExceptionHandler,
final Class<? extends TimestampExtractor> timestampExtractor) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding new TimestampExtractor parameter here, and cleanup the overload of createConfig a little bit, to avoid passing in unnecessary parameters (create a little noise on the PR below, but code is much cleaner now)

@bbejeck bbejeck left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM with just a couple of minor comments

private static StreamsConfig createConfig(final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler) {
return createConfig(
AT_LEAST_ONCE,
"0",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is represented by the "0"? Maybe add a named variable to help grokking what's going on.

private static StreamsConfig createConfigWithTsExtractor(final Class<? extends TimestampExtractor> timestampExtractor) {
return createConfig(
AT_LEAST_ONCE,
"0",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


private static StreamsConfig createConfigWithTsExtractor(final Class<? extends TimestampExtractor> timestampExtractor) {
return createConfig(
AT_LEAST_ONCE,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why creating configs with AT_LEAST_ONCE only?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want to run this test with EOS? This helper is only used for the new test of this PR. I don't see any reason why we should run the test with EOS? (In the end, the alos/eos config does not really matter, but given the helper overloads we have, we need to pass in something if we don't want to get too many overloads.)

task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100",
LogAndFailExceptionHandler.class.getName(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just simplifying the setup: added a new createConfig overload which does set LogAndFailExceptionHandler by default (instead of using the old one which always get LogAndFailExceptionHandler passed when used)

task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100",
LogAndFailExceptionHandler.class.getName(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here and below

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as above,

If helpful, happy to split out the refactoring into it's own PR (or maybe even multiple to make it simpler to follow -- it's a little messy...)

@mjsax mjsax merged commit 0bc91be into apache:trunk Nov 10, 2024
@mjsax mjsax deleted the kafka-17872-drop-ts-extractor branch November 10, 2024 01:21
mjsax added a commit that referenced this pull request Nov 10, 2024
#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
mjsax added a commit that referenced this pull request Nov 10, 2024
#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
@mjsax

mjsax commented Nov 10, 2024

Copy link
Copy Markdown
Member Author

Merged to trunk and cherry-picked to 3.9, 3.8, and 3.7 branches.

mjsax added a commit that referenced this pull request Nov 10, 2024
#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
apache#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
eduwercamacaro added a commit to littlehorse-enterprises/kafka that referenced this pull request Dec 17, 2024
* KAFKA-17926 Improve the documentation explaining why max.in.flight.requests.per.connection should not exceed 5 (apache#17719)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Cleanup GroupCoordinatorRecordHelpers (apache#17718)

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* rebase to fix merge conflict (apache#17702)

Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.

The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.

In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy < lbrutschy@confluent.io>

* KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns (apache#17676)

Reviewers: Jun Rao <junrao@gmail.com>

* MINOR: log fix in SnapshottableCoordinator (apache#17726)

Reviewers: donaldzhu-cc, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17570 Rewrite StressTestLog by Java (apache#17249)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Delete unused member from KafkaAdminClient (apache#17729)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17970 Moving some share purgatory classes from core to share module (apache#17722)

As part of PR: apache#17636 where purgatory has been moved from core to server-common hence move some existing classes used in Share Fetch to Share module.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17837 Rewrite DeleteTopicTest (apache#17579)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Move StopPartition to server-common (apache#17704)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-15549 Bump swagger dependency version from 2.2.8 to 2.2.25 (apache#17730)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17872: Update consumed offsets on records with invalid timestamp (apache#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>

* KAFKA-17925 Convert Kafka Client integration tests to use KRaft (apache#17670)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17779: Fix flaky RemoteLogManager test (apache#17724)

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

* KAFKA-17455: fixes stuck producer by polling again after pollDelayMs in NetworkUtils#awaitReady()

* clarifies comments

* attempts to add test

* Adds a test but my changes to MockClient.java broke all sorts of stuff

* test that passes on my branch and fails on trunk

* addresses PR feedback: rename MockClient#setAdvanceTimeDuringPoll to advanceTimeDuringPoll()

---------

Co-authored-by: PoAn Yang <payang@apache.org>
Co-authored-by: David Jacot <djacot@confluent.io>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Co-authored-by: Jeff Kim <kimkb2011@gmail.com>
Co-authored-by: TengYao Chi <kitingiao@gmail.com>
Co-authored-by: Apoorv Mittal <apoorvmittal10@gmail.com>
Co-authored-by: Mickael Maison <mimaison@users.noreply.github.com>
Co-authored-by: Yung <yungyung7654321@gmail.com>
Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Co-authored-by: wperlichek <61857706+wperlichek@users.noreply.github.com>
Co-authored-by: Colt McNealy <colt@littlehorse.io>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
apache#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants