Skip to content

KAFKA-2403; Add support for commit metadata in KafkaConsumer#119

Closed
hachikuji wants to merge 1 commit into
apache:trunkfrom
hachikuji:KAFKA-2403
Closed

KAFKA-2403; Add support for commit metadata in KafkaConsumer#119
hachikuji wants to merge 1 commit into
apache:trunkfrom
hachikuji:KAFKA-2403

Conversation

@hachikuji

Copy link
Copy Markdown
Contributor

No description provided.

@asfbot

asfbot commented Aug 6, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #100 SUCCESS
This pull request looks good

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel enforcing users to create OffsetMetadata upon commit brings some overhead, especially in practice most people do not want to embed any metadata with their commits. I would like to propose an alternative solution regarding the API:

commit(Map<TopicPartition, Long>, ComitType, String /* Commit Message /)
commit(Map<TopicPartition, Long>, ComitType, String /
Commit Message */, ConsumerCommitCallBack)

And the commit message will be used as the metadata for all the partitions included. It is based on my assumption that users usually would have a single commit message (i.e. the metadata string) per each commit call; if they want to have different messages for different partitions (for example in CopyCat @ewencp), they can call commit() multiple times with the metadata strings on each partition. This of course may cause more round trips for all sync-commit, but I would suggest people to use the following pattern:

commit(async);
commit(async);
...
commit(sync); // last call

Since now the commit calls are all ordered.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Yeah I agree that it's annoying for most users who don't care about offset metadata. Would the expectation in your suggested API be that the commit message would be tied to the metadata for each topic partition in the map?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang when you say overhead, do you mean from an API usability perspective or from a performance perspective?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure I like that API. Having a map for the offsets and then requiring all TPs to use the same metadata is odd. And it seems pretty annoying if you want to do async and get a callback when the commit completes -- now you have to manage waiting on all the callbacks to complete as well. Also, splitting the commit across a bunch of separate requests isn't ideal.

Another alternative is to add this variant:

public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, CommitType commitType, ConsumerCommitCallback callback);

but leave the existing Map<TopicPartition, Long> versions. If you want to include metadata, you must provide all the other parameters, which avoids introducing 2x the number of APIs and is reasonable since using metadata is an advanced feature. This has the same impact as the String commitMessage variant since I assume that would be an additional method since most users do not want it. So both would add 1 method, but this version does everything in one round trip, only has one callback, etc.

Copycat actually doesn't have a use case for the metadata currently, so I unfortunately don't have a good example use case for any of this currently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp To be clear, that would mean we couldn't provide the same version of that method which uses a Long in the Map, right? As a user, I would probably rather have all the commit methods be consistent, but it may be a reasonable compromise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that the user would maintain Map<TopicPartition, OffsetMetadata>, so map.values() would return Collection<OffsetMetadata>, which could be passed directly to commit. That said, I don't have a strong preference on any of the choices so far, but I'm probably leaning toward the initial approach. The overhead is not so high and users who want to manage their own commits may find metadata more useful than we're imagining. Off the top of my head, I can imagine wanting to add timestamps, hostnames, or maybe versioning information to the commit. If instead it's really such an extreme edge case that almost no users would find value in, maybe it doesn't belong in the API at all and we can think about another way to expose it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp Regarding making OffsetAndMetadata mutable, does that make sense if we do defensive copies of the passed in map as is being suggested? It seems to me that ownership rules would be inconsistent in that case and we would be doing plenty of allocation with the map copy anyway. It's also worth saying that due to boxing, we were already allocating Long instances, so the number of allocations doesn't change, it's just the size of each instance.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I think my proposed approach does not suits well as well after reading through the conversations. If we agree that the percentage of users calling commit with manual offset values is really low, then I think it may be OK for adding this API usability overhead for them with a probably even lower percentage usage. So let's just proceed with the original API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's just proceed with the original API.
@guozhangwang Can you clarify?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nehanarkhede Yup I can review and check it in once it is rebased.

@hachikuji hachikuji changed the title KAFKA-2403; Add support for commit metadata in KafkaConsumer KAFKA-2403 [WIP]; Add support for commit metadata in KafkaConsumer Aug 6, 2015
@asfbot

asfbot commented Aug 6, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #101 FAILURE
Looks like there's a problem with this pull request

@asfbot

asfbot commented Sep 1, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #295 SUCCESS
This pull request looks good

@asfbot

asfbot commented Sep 21, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #471 SUCCESS
This pull request looks good

@hachikuji hachikuji changed the title KAFKA-2403 [WIP]; Add support for commit metadata in KafkaConsumer KAFKA-2403; Add support for commit metadata in KafkaConsumer Sep 23, 2015

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "metadata == null ? that.metadata == null : metadata.equals(that.metadata)"?

@hachikuji hachikuji force-pushed the KAFKA-2403 branch 2 times, most recently from a724974 to 86c77dd Compare September 23, 2015 19:49
@asfbot

asfbot commented Sep 23, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #504 FAILURE
Looks like there's a problem with this pull request

@asfbot

asfbot commented Sep 23, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #505 SUCCESS
This pull request looks good

@hachikuji

Copy link
Copy Markdown
Contributor Author

@guozhangwang Made a few minor changes to the comments and added a defensive copy of the passed map in commitAsync.

@asfgit asfgit closed this in d1dd1e9 Sep 23, 2015
@guozhangwang

Copy link
Copy Markdown
Contributor

LGTM.

@asfbot

asfbot commented Sep 23, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #506 SUCCESS
This pull request looks good

efeg added a commit to efeg/kafka that referenced this pull request Jan 29, 2020
* Fix stats overwrite in case of multiple unknown hosts.
* Add leader count per broker/replica.
davide-armand pushed a commit to aiven/kafka that referenced this pull request Dec 1, 2025
There may be a situation where under high traffic files are mostly committed due to size. However, the commit-by-time ticks are still running and sometimes commit small files. There’s a condition for not committing empty files, but committing files with one request in them is possible, while more requests are waiting in the queue.

This change makes scheduled tick more controlled: they are scheduled only when the current file is written to for the first time and are cancelled when it's committed due to size. There's still room for accidental commit of a small file due to time, but the probability doesn't seem to be high + no serious consequences. Because of this, it was decided to not do even more strict control (like check what file was supposed to be committed when the time comes) to not complicate tests.
fvaleri added a commit to fvaleri/kafka that referenced this pull request Apr 2, 2026
…et-key-rename

Fix renaming of LastMirroredOffset to LastMirroredEpoch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants