Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3100,7 +3100,12 @@ void handleResponse(AbstractResponse abstractResponse) {
final Long offset = partitionData.offset;
final String metadata = partitionData.metadata;
final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
// Negative offset indicates that the group has no committed offset for this partition
if (offset < 0) {
Comment thread
hachikuji marked this conversation as resolved.
groupOffsetsListing.put(topicPartition, null);
} else {
groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
}
} else {
log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ListConsumerGroupOffsetsResult {

/**
* Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
* If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,7 @@ public void testDescribeConsumerGroupOffsets() throws Exception {
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);

final Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10,
Expand All @@ -1502,15 +1503,19 @@ public void testDescribeConsumerGroupOffsets() throws Exception {
Optional.empty(), "", Errors.NONE));
responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20,
Optional.empty(), "", Errors.NONE));
responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), "", Errors.NONE));
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));

final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0");
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();

assertEquals(3, partitionToOffsetAndMetadata.size());
assertEquals(4, partitionToOffsetAndMetadata.size());
assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0).offset());
assertEquals(0, partitionToOffsetAndMetadata.get(myTopicPartition1).offset());
assertEquals(20, partitionToOffsetAndMetadata.get(myTopicPartition2).offset());
assertTrue(partitionToOffsetAndMetadata.containsKey(myTopicPartition3));
assertNull(partitionToOffsetAndMetadata.get(myTopicPartition3));
Comment thread
hachikuji marked this conversation as resolved.
}
}

Expand Down