KAFKA-2403; Add support for commit metadata in KafkaConsumer#119
KAFKA-2403; Add support for commit metadata in KafkaConsumer#119hachikuji wants to merge 1 commit into
Conversation
|
kafka-trunk-git-pr #100 SUCCESS |
There was a problem hiding this comment.
I feel enforcing users to create OffsetMetadata upon commit brings some overhead, especially in practice most people do not want to embed any metadata with their commits. I would like to propose an alternative solution regarding the API:
commit(Map<TopicPartition, Long>, ComitType, String /* Commit Message /)
commit(Map<TopicPartition, Long>, ComitType, String / Commit Message */, ConsumerCommitCallBack)
And the commit message will be used as the metadata for all the partitions included. It is based on my assumption that users usually would have a single commit message (i.e. the metadata string) per each commit call; if they want to have different messages for different partitions (for example in CopyCat @ewencp), they can call commit() multiple times with the metadata strings on each partition. This of course may cause more round trips for all sync-commit, but I would suggest people to use the following pattern:
commit(async);
commit(async);
...
commit(sync); // last call
Since now the commit calls are all ordered.
There was a problem hiding this comment.
@guozhangwang Yeah I agree that it's annoying for most users who don't care about offset metadata. Would the expectation in your suggested API be that the commit message would be tied to the metadata for each topic partition in the map?
There was a problem hiding this comment.
@guozhangwang when you say overhead, do you mean from an API usability perspective or from a performance perspective?
There was a problem hiding this comment.
Hmm, not sure I like that API. Having a map for the offsets and then requiring all TPs to use the same metadata is odd. And it seems pretty annoying if you want to do async and get a callback when the commit completes -- now you have to manage waiting on all the callbacks to complete as well. Also, splitting the commit across a bunch of separate requests isn't ideal.
Another alternative is to add this variant:
public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, CommitType commitType, ConsumerCommitCallback callback);
but leave the existing Map<TopicPartition, Long> versions. If you want to include metadata, you must provide all the other parameters, which avoids introducing 2x the number of APIs and is reasonable since using metadata is an advanced feature. This has the same impact as the String commitMessage variant since I assume that would be an additional method since most users do not want it. So both would add 1 method, but this version does everything in one round trip, only has one callback, etc.
Copycat actually doesn't have a use case for the metadata currently, so I unfortunately don't have a good example use case for any of this currently.
There was a problem hiding this comment.
@ewencp To be clear, that would mean we couldn't provide the same version of that method which uses a Long in the Map, right? As a user, I would probably rather have all the commit methods be consistent, but it may be a reasonable compromise.
There was a problem hiding this comment.
I was thinking that the user would maintain Map<TopicPartition, OffsetMetadata>, so map.values() would return Collection<OffsetMetadata>, which could be passed directly to commit. That said, I don't have a strong preference on any of the choices so far, but I'm probably leaning toward the initial approach. The overhead is not so high and users who want to manage their own commits may find metadata more useful than we're imagining. Off the top of my head, I can imagine wanting to add timestamps, hostnames, or maybe versioning information to the commit. If instead it's really such an extreme edge case that almost no users would find value in, maybe it doesn't belong in the API at all and we can think about another way to expose it.
There was a problem hiding this comment.
@ewencp Regarding making OffsetAndMetadata mutable, does that make sense if we do defensive copies of the passed in map as is being suggested? It seems to me that ownership rules would be inconsistent in that case and we would be doing plenty of allocation with the map copy anyway. It's also worth saying that due to boxing, we were already allocating Long instances, so the number of allocations doesn't change, it's just the size of each instance.
There was a problem hiding this comment.
OK, now I think my proposed approach does not suits well as well after reading through the conversations. If we agree that the percentage of users calling commit with manual offset values is really low, then I think it may be OK for adding this API usability overhead for them with a probably even lower percentage usage. So let's just proceed with the original API.
There was a problem hiding this comment.
So let's just proceed with the original API.
@guozhangwang Can you clarify?
There was a problem hiding this comment.
@nehanarkhede Yup I can review and check it in once it is rebased.
|
kafka-trunk-git-pr #101 FAILURE |
1b7e26f to
5fd658a
Compare
|
kafka-trunk-git-pr #295 SUCCESS |
5fd658a to
116a92c
Compare
|
kafka-trunk-git-pr #471 SUCCESS |
There was a problem hiding this comment.
How about "metadata == null ? that.metadata == null : metadata.equals(that.metadata)"?
a724974 to
86c77dd
Compare
|
kafka-trunk-git-pr #504 FAILURE |
86c77dd to
a21851f
Compare
|
kafka-trunk-git-pr #505 SUCCESS |
|
@guozhangwang Made a few minor changes to the comments and added a defensive copy of the passed map in commitAsync. |
|
LGTM. |
|
kafka-trunk-git-pr #506 SUCCESS |
* Fix stats overwrite in case of multiple unknown hosts. * Add leader count per broker/replica.
There may be a situation where under high traffic files are mostly committed due to size. However, the commit-by-time ticks are still running and sometimes commit small files. There’s a condition for not committing empty files, but committing files with one request in them is possible, while more requests are waiting in the queue. This change makes scheduled tick more controlled: they are scheduled only when the current file is written to for the first time and are cancelled when it's committed due to size. There's still room for accidental commit of a small file due to time, but the probability doesn't seem to be high + no serious consequences. Because of this, it was decided to not do even more strict control (like check what file was supposed to be committed when the time comes) to not complicate tests.
…et-key-rename Fix renaming of LastMirroredOffset to LastMirroredEpoch
No description provided.