Migrating from AdminUtils with AdminClient#848
Conversation
| @@ -21,4 +21,5 @@ ext { | |||
| testngVersion = "7.1.0" | |||
| zkclientVersion = "0.11" | |||
There was a problem hiding this comment.
I should probably delete this as well
There was a problem hiding this comment.
If there are no usages you should
There was a problem hiding this comment.
This is still required. Will remove this in a the following PR where I fully remove and replace 101tec ZkClient with Helix ZkClient.
90983f2 to
4348b9d
Compare
| */ | ||
| public static void waitForTopicCreation(ZkUtils zkUtils, String topic, String brokerList) throws IllegalStateException { | ||
| Validate.notNull(zkUtils); | ||
| public static void waitForTopicCreation(AdminClient adminClient, String topic, String brokerList) throws IllegalStateException { |
There was a problem hiding this comment.
I am not sure if we need this anymore. In this patch I replace calls to AdminUtils.createTopic with calls to AdminClient.createTopics method, and createTopics essentially returns a "KafkaFuture" object on which I currently do a blocking wait (to wait for topic to be created). So we probably don't need this anymore? I have currently removed all calls to this method in test cases and its working fine.
| * @param adminClient AdminClient instance to check if topics exists | ||
| * @param topic Topic name | ||
| */ | ||
| public static boolean topicExists(AdminClient adminClient, String topic) { |
There was a problem hiding this comment.
I tried to import "topicExists" method from BaseKafkaZkTest class here but I kept getting "package com.linkedin.datastream.testutil does not exist" error so I copied the method here. Its probably some classpath issue but I am not sure how to resolve it in this repo.
There was a problem hiding this comment.
Is this because of the circular dependency problem we discussed offline?
There was a problem hiding this comment.
If it's a circular dep issue, then could we reference this function from BaseKafkaZkTest class's call to avoid duplicate code?
There was a problem hiding this comment.
I can check but I doubt that will work. I guess trying to do it either way will still result in circular dependency.
| // Removing from topic config since its passed as a direct argument | ||
| topicProperties.remove("replicationFactor"); | ||
| // TopicExistsException is thrown if topic exists. Its a no-op. | ||
| adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); |
There was a problem hiding this comment.
By doing ".all().get()" we have a blocking wait for the topic to be created. I am not sure what exactly was the behavior with AdminUtils and/or if doing a blocking wait is the correct thing to do here. I am guessing this is the correct way?
However, having said that. On repeated execution of all the unit tests, I noticed that the test "testCreateDatastreamHappyPathDefaultRetention" is a little flaky in that it randomly fails every now and then (very rarely though, hard to reproduce). This test calls "getRetention" which queries topic config and returns retention time if its exists in policy else null, and when this test fails its because getRetention returned null instead of actual expected value. I am not sure if this test was flaky even before. I am not sure how to explain this behavior.
There was a problem hiding this comment.
Summarizing the offline discussion here:
There's no callbacks and returned futures with AdminUtils.createTopic. No hints that it's an asynchronous operation. I'm not a scala expert, but from the looks of it it's a synchronous call.
As for the failing/flaky test:
- Was this test flaky even before or it became flaky as a result of the API changes? We need to rule out changes in behavior of the system after switching from
AdminUtilstoAdminClient - Does adding a thread sleep after topic creation (and before querying the retention of the new topic) make the test pass?
For (1), we'll reach out to Kafka to confirm that there are no changes in behavior in createTopic APIs. For (2) we can try fixing the flaky test in a separate PR.
| } | ||
| } | ||
|
|
||
| private AdminClient getAdminClient(Datastream datastream) { |
There was a problem hiding this comment.
Do we need to cache AdminClient instances based on the destinationBrokers ? Depends on how frequently this method is called and how expensive it is to re-instantiate AdminClient object, but I am not sure of it.
There was a problem hiding this comment.
In an offline discussion with Jhora we decided to not do it for now and do it later if the need arises.
| provider.createTopic(destinationUri, 1, new Properties(), ds); | ||
|
|
||
| KafkaTestUtils.waitForTopicCreation(_zkUtils, topicName, _kafkaCluster.getBrokers()); | ||
| //KafkaTestUtils.waitForTopicCreation(_adminClient, topicName, _kafkaCluster.getBrokers()); |
There was a problem hiding this comment.
Will remove this in following update to this patch. Same for another instance below.
| // TopicExistsException is thrown if topic exists. Its a no-op. | ||
| adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) { |
There was a problem hiding this comment.
I may be missing something here, but org.apache.kafka.common.errors.TopicExistsException is neither an InterruptedException nor an ExecutionException according to https://kafka.apache.org/28/javadoc//org/apache/kafka/common/errors/TopicExistsException.html.
How can the first condition of this if statement ever be true? (and I know this code was there even before)
There was a problem hiding this comment.
Actually I found this reference implementation in LiKafkaTransporterProviderAdmin implementation here
But let me double check this once anyway.
There was a problem hiding this comment.
So looks like TopicExistsException has an inheritance hierarchy (chain) that inherits from java.lang.Exception and both InterruptedException and ExecutionException also inherit from same java.lang.Excepption.
There was a problem hiding this comment.
I understand that, my point was that TopicExistsException is neither a descendant of InterruptedException nor is it a descendant of ExecutionException. That first condition is always false.
There was a problem hiding this comment.
yea so, we may need recursive calls to get the cause of the exceptions until we hit the topic exists right?
There was a problem hiding this comment.
So I suspect TopicExistsException is raised as "ExecutionException(new TopicExistsException())" and that is why it needs to be handled this way. If I split it and add a separate catch block for TopicExistsException then it fails to catch that exception and that exception goes uncaught. I tried it and saw a few unit tests fail because of it.
| * @param adminClient AdminClient instance to check if topics exists | ||
| * @param topic Topic name | ||
| */ | ||
| public static boolean topicExists(AdminClient adminClient, String topic) { |
There was a problem hiding this comment.
Is this because of the circular dependency problem we discussed offline?
| @@ -21,4 +21,5 @@ ext { | |||
| testngVersion = "7.1.0" | |||
| zkclientVersion = "0.11" | |||
There was a problem hiding this comment.
If there are no usages you should
vmaheshw
left a comment
There was a problem hiding this comment.
Can you please split this PR and separate out the admin client change? AdminClient change can be merged as is.
For the zk client migration part, we can see if we have a jfrog branching. Otherwise, we can duplicate some of the code and have different classes called using config knob and then later get rid of the knob, if things are working fine. It may duplicate some code, but will definitely unblock you and remove your dependency from jfrog branching.
4348b9d to
7eb32d2
Compare
Undid the ZkClient changes to revert back to extending 101tec ZkClient
| * @param adminClient AdminClient instance to check if topics exists | ||
| * @param topic Topic name | ||
| */ | ||
| public static boolean topicExists(AdminClient adminClient, String topic) { |
There was a problem hiding this comment.
If it's a circular dep issue, then could we reference this function from BaseKafkaZkTest class's call to avoid duplicate code?
...java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java
Show resolved
Hide resolved
datastream-testcommon/src/main/java/com/linkedin/datastream/testutil/BaseKafkaZkTest.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProviderAdmin.java
Show resolved
Hide resolved
| // TopicExistsException is thrown if topic exists. Its a no-op. | ||
| adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) { |
There was a problem hiding this comment.
yea so, we may need recursive calls to get the cause of the exceptions until we hit the topic exists right?
7eb32d2 to
9a12243
Compare
Replace kafka.admin.AdminUtils usage with kafka.client.AdminClient because AdminUtils is deprecated. Also in a following effort 101tec ZkClient will be replaced by helix ZkClient and this is required for that.
9a12243 to
3aca65b
Compare
Replace kafka.admin.AdminUtils usage with kafka.client.AdminClient because AdminUtils is deprecated. Also in a following effort 101tec ZkClient will be replaced by helix ZkClient and this is required for that.
Replace kafka.admin.AdminUtils usage with kafka.client.AdminClient because AdminUtils is deprecated.
Also in a following effort 101tec ZkClient will be replaced by helix ZkClient and this is required for that.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md