Skip to content

Commit fe132ee

Browse files
chia7712rhauch
authored andcommitted
KAFKA-12339: Add retry to admin client's listOffsets (#10152)
`KafkaAdmin.listOffsets` did not handle topic-level errors, hence the UnknownTopicOrPartitionException on topic-level can obstruct a Connect worker from running when the new internal topic is NOT synced to all brokers. The method did handle partition-level retriable errors by retrying, so this changes to handle topic-level retriable errors in the same way. This allows a Connect worker to start up and have the admin client retry when the worker is trying to read to the end of the newly-created internal topics until the internal topic metadata is synced to all brokers. Author: Chia-Ping Tsai <chia7712@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
1 parent f4bef70 commit fe132ee

2 files changed

Lines changed: 40 additions & 2 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public Collection<String> topics() {
8282

8383
public static void handleMetadataErrors(MetadataResponse response) {
8484
for (TopicMetadata tm : response.topicMetadata()) {
85+
if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
8586
for (PartitionMetadata pm : tm.partitionMetadata()) {
8687
if (shouldRefreshMetadata(pm.error)) {
8788
throw pm.error.exception();

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,15 @@ private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors err
342342
}
343343

344344
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
345+
return prepareMetadataResponse(cluster, error, error);
346+
}
347+
348+
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
345349
List<TopicMetadata> metadata = new ArrayList<>();
346350
for (String topic : cluster.topics()) {
347351
List<PartitionMetadata> pms = new ArrayList<>();
348352
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
349-
PartitionMetadata pm = new PartitionMetadata(error,
353+
PartitionMetadata pm = new PartitionMetadata(partitionError,
350354
new TopicPartition(topic, pInfo.partition()),
351355
Optional.of(pInfo.leader().id()),
352356
Optional.of(234),
@@ -355,7 +359,7 @@ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors
355359
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
356360
pms.add(pm);
357361
}
358-
TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
362+
TopicMetadata tm = new TopicMetadata(topicError, topic, false, pms);
359363
metadata.add(tm);
360364
}
361365
return MetadataResponse.prepareResponse(0,
@@ -2342,6 +2346,39 @@ public void testListOffsets() throws Exception {
23422346
}
23432347
}
23442348

2349+
@Test
2350+
public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
2351+
Node node = new Node(0, "localhost", 8120);
2352+
List<Node> nodes = Collections.singletonList(node);
2353+
final Cluster cluster = new Cluster(
2354+
"mockClusterId",
2355+
nodes,
2356+
Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
2357+
Collections.emptySet(),
2358+
Collections.emptySet(),
2359+
node);
2360+
final TopicPartition tp0 = new TopicPartition("foo", 0);
2361+
2362+
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
2363+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
2364+
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
2365+
// metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
2366+
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
2367+
// listoffsets response from broker 0
2368+
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
2369+
responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321)));
2370+
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
2371+
2372+
ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
2373+
2374+
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
2375+
assertEquals(1, offsets.size());
2376+
assertEquals(123L, offsets.get(tp0).offset());
2377+
assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
2378+
assertEquals(-1L, offsets.get(tp0).timestamp());
2379+
}
2380+
}
2381+
23452382
@Test
23462383
public void testListOffsetsRetriableErrors() throws Exception {
23472384

0 commit comments

Comments
 (0)