Skip to content

KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand#5088

Merged
ijuma merged 9 commits into
apache:trunkfrom
vahidhashemian:KAFKA-6955
Jun 2, 2018
Merged

KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand#5088
ijuma merged 9 commits into
apache:trunkfrom
vahidhashemian:KAFKA-6955

Conversation

@vahidhashemian

@vahidhashemian vahidhashemian commented May 27, 2018

Copy link
Copy Markdown
Contributor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@omkreddy

Copy link
Copy Markdown
Contributor

LGTM

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, I left one minor comment but otherwise I think it's good.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this foreach thing could be simplified in my opinion:

val recordsToDelete = offsetSeq.map(offset => offset._1 -> RecordsToDelete.beforeOffset(offset._2)).toMap.asJava

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! You are right. We can even skip the direct var definition and embed the calculation into the following statement; i.e.

val deleteRecordsResult = adminClient.deleteRecords(offsetSeq.map(offset => (offset._1, RecordsToDelete.beforeOffset(offset._2))).toMap.asJava)

But I'll leave it as you suggested to make it more readable.
I'll push a commit shortly.

@ijuma ijuma left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Shall we remove the relevant methods from the Scala AdminClients along with the tests and also add an upgrade note?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not import it as AdminClient? We don't use the other AdminClient anymore so we don't need to rename it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to remain consistent with this import in other similar files; e.g. ConfigCommand.scala and DelegationTokenCommand.scala. No preference otherwise, and can follow your suggestion if you still think that's preferred.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala AdminClient is going away and there is no need to have this rename in classes that don't need it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I removed the alias. Since the old AdminClient is in the same package, the normal import doesn't seem work; hence the admin.AdminClient reference.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use { case (a, b) => (choose appropriate variable names) to avoid the ugly _1 and _2

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for ().

@vahidhashemian vahidhashemian left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma, tried to address your comments in the new commit (see one response inline). Also, removed the old method from AdminClient.scala, and as a result updated the corresponding unit tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to remain consistent with this import in other similar files; e.g. ConfigCommand.scala and DelegationTokenCommand.scala. No preference otherwise, and can follow your suggestion if you still think that's preferred.

@omkreddy

omkreddy commented Jun 1, 2018

Copy link
Copy Markdown
Contributor

@ijuma @vahidhashemian I am not sure that we want to delete methods from scala AdminClient now. we should give users some more time to migrate. KafkaAdminClient.deleteRecords API added in recent 1.1.0 release.
maybe we can drop the AdminClient at once after migrating/deprecating all other methods (consumer group, api versions)?

@ijuma

ijuma commented Jun 1, 2018

Copy link
Copy Markdown
Member

@omkreddy The Scala AdminClient was never a public API and we removed it from the original KIP (KIP-107) as a result. In addition, we added documentation explaining that it was deprecated when the Java AdminClient was added in the 0.11.0 release. Are you aware of anyone using it?

@ijuma ijuma left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor comments, but let's wait for @omkreddy to talk a bit more about his reservations.

consumer.seekToBeginning(Collections.singletonList(tp))
assertEquals(0L, consumer.position(tp))

client.deleteRecordsBefore(Map((tp, 5L))).get()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test like this in AdminClientIntegrationTest? If not, we should move it there. If so, we should just remove this test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the duplicate delete records tests, and moved the non-duplicate ones over.

Comment thread docs/upgrade.html Outdated
timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
will be removed in a future version.</li>
<li>The implementation of DeleteRecordsCommand makes use of the new Java-based AdminClient.</li>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the note should be about removing AdminClient.deleteRecordsBefore.

@omkreddy

omkreddy commented Jun 1, 2018

Copy link
Copy Markdown
Contributor

@ijuma I am not aware of anyone using deleteRecordsBefore API. But I know few tools/companies using AdminClient methods for consumer group monitoring.

This is similar to AdminUtils.scala, which is not a public API, but used in some projects.
I just felt instead of removing methods, it may be appropriate to drop AdminClient at-once after migrating all the API.

I don't have strong reservations about deleting deleteRecordsBefore from scala client.

@ijuma

ijuma commented Jun 1, 2018

Copy link
Copy Markdown
Member

The main reason for doing it now is that we are doing a major version bump. And unlike the list groups functionality, deleteRecordsBefore was available for a short period in the Scala AdminClient before it became available in the Java AdminClient.

@vahidhashemian vahidhashemian left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma, thanks for taking another look. Updated the patch with your suggestions.

consumer.seekToBeginning(Collections.singletonList(tp))
assertEquals(0L, consumer.position(tp))

client.deleteRecordsBefore(Map((tp, 5L))).get()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the duplicate delete records tests, and moved the non-duplicate ones over.

@ijuma ijuma left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed minor tweaks @vahidhashemian, LGTM with these changes. Are you OK with them?

@vahidhashemian

Copy link
Copy Markdown
Contributor Author

@ijuma, LGTM. Thanks for the tweaks :)

messageCount == 10
}, "Expected 10 messages", 3000L)

client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all().get()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple more () can be removed here and a few lines below.

@ijuma

ijuma commented Jun 2, 2018

Copy link
Copy Markdown
Member

retest this please

@ijuma ijuma merged commit 341d5db into apache:trunk Jun 2, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
- Removed internal kafka.admin.AdminClient.deleteRecordsBefore since it's
no longer used.
- Removed redundant tests and rewrote non redundant ones to use the Java
AdminClient.

Reviewers: Viktor Somogyi <viktor.somogyi@cloudera.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants