KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand#5088
Conversation
|
LGTM |
viktorsomogyi
left a comment
There was a problem hiding this comment.
Thanks for the contribution, I left one minor comment but otherwise I think it's good.
There was a problem hiding this comment.
nit: this foreach thing could be simplified in my opinion:
val recordsToDelete = offsetSeq.map(offset => offset._1 -> RecordsToDelete.beforeOffset(offset._2)).toMap.asJava
There was a problem hiding this comment.
Thanks! You are right. We can even skip the direct var definition and embed the calculation into the following statement; i.e.
val deleteRecordsResult = adminClient.deleteRecords(offsetSeq.map(offset => (offset._1, RecordsToDelete.beforeOffset(offset._2))).toMap.asJava)
But I'll leave it as you suggested to make it more readable.
I'll push a commit shortly.
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR. Shall we remove the relevant methods from the Scala AdminClients along with the tests and also add an upgrade note?
There was a problem hiding this comment.
Why not import it as AdminClient? We don't use the other AdminClient anymore so we don't need to rename it.
There was a problem hiding this comment.
I wanted to remain consistent with this import in other similar files; e.g. ConfigCommand.scala and DelegationTokenCommand.scala. No preference otherwise, and can follow your suggestion if you still think that's preferred.
There was a problem hiding this comment.
The Scala AdminClient is going away and there is no need to have this rename in classes that don't need it.
There was a problem hiding this comment.
Sure, I removed the alias. Since the old AdminClient is in the same package, the normal import doesn't seem work; hence the admin.AdminClient reference.
There was a problem hiding this comment.
Use { case (a, b) => (choose appropriate variable names) to avoid the ugly _1 and _2
de1ca81 to
b25c8f4
Compare
vahidhashemian
left a comment
There was a problem hiding this comment.
@ijuma, tried to address your comments in the new commit (see one response inline). Also, removed the old method from AdminClient.scala, and as a result updated the corresponding unit tests.
There was a problem hiding this comment.
I wanted to remain consistent with this import in other similar files; e.g. ConfigCommand.scala and DelegationTokenCommand.scala. No preference otherwise, and can follow your suggestion if you still think that's preferred.
3723562 to
3beab26
Compare
|
@ijuma @vahidhashemian I am not sure that we want to delete methods from scala AdminClient now. we should give users some more time to migrate. KafkaAdminClient.deleteRecords API added in recent 1.1.0 release. |
|
@omkreddy The Scala AdminClient was never a public API and we removed it from the original KIP (KIP-107) as a result. In addition, we added documentation explaining that it was deprecated when the Java AdminClient was added in the 0.11.0 release. Are you aware of anyone using it? |
| consumer.seekToBeginning(Collections.singletonList(tp)) | ||
| assertEquals(0L, consumer.position(tp)) | ||
|
|
||
| client.deleteRecordsBefore(Map((tp, 5L))).get() |
There was a problem hiding this comment.
Do we have a test like this in AdminClientIntegrationTest? If not, we should move it there. If so, we should just remove this test.
There was a problem hiding this comment.
I removed the duplicate delete records tests, and moved the non-duplicate ones over.
| timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which | ||
| does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and | ||
| will be removed in a future version.</li> | ||
| <li>The implementation of DeleteRecordsCommand makes use of the new Java-based AdminClient.</li> |
There was a problem hiding this comment.
I meant that the note should be about removing AdminClient.deleteRecordsBefore.
|
@ijuma I am not aware of anyone using deleteRecordsBefore API. But I know few tools/companies using AdminClient methods for consumer group monitoring. This is similar to AdminUtils.scala, which is not a public API, but used in some projects. I don't have strong reservations about deleting deleteRecordsBefore from scala client. |
|
The main reason for doing it now is that we are doing a major version bump. And unlike the list groups functionality, |
vahidhashemian
left a comment
There was a problem hiding this comment.
@ijuma, thanks for taking another look. Updated the patch with your suggestions.
| consumer.seekToBeginning(Collections.singletonList(tp)) | ||
| assertEquals(0L, consumer.position(tp)) | ||
|
|
||
| client.deleteRecordsBefore(Map((tp, 5L))).get() |
There was a problem hiding this comment.
I removed the duplicate delete records tests, and moved the non-duplicate ones over.
ijuma
left a comment
There was a problem hiding this comment.
I pushed minor tweaks @vahidhashemian, LGTM with these changes. Are you OK with them?
|
@ijuma, LGTM. Thanks for the tweaks :) |
| messageCount == 10 | ||
| }, "Expected 10 messages", 3000L) | ||
|
|
||
| client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all().get() |
There was a problem hiding this comment.
A couple more () can be removed here and a few lines below.
28788ef to
016200d
Compare
|
retest this please |
- Removed internal kafka.admin.AdminClient.deleteRecordsBefore since it's no longer used. - Removed redundant tests and rewrote non redundant ones to use the Java AdminClient. Reviewers: Viktor Somogyi <viktor.somogyi@cloudera.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Committer Checklist (excluded from commit message)