Skip to content

KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas#9628

Merged
dajac merged 9 commits into
apache:trunkfrom
splett2:KAFKA-10747
Dec 10, 2020
Merged

KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas#9628
dajac merged 9 commits into
apache:trunkfrom
splett2:KAFKA-10747

Conversation

@splett2

@splett2 splett2 commented Nov 20, 2020

Copy link
Copy Markdown
Contributor

This PR adds support for IP entities to the DescribeClientQuotas and AlterClientQuotas APIs. This change does not require any protocol version bumps.

This PR also adds support for describing/altering IP quotas via kafka-configs tooling.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dajac dajac self-requested a review November 20, 2020 12:30

@bdbyrne bdbyrne left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG - only minor comments, thanks for the change!

Comment thread clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java Outdated
Comment thread core/src/main/scala/kafka/server/AdminManager.scala Outdated
Comment thread core/src/test/scala/unit/kafka/utils/TestUtils.scala Outdated
Comment thread core/src/main/scala/kafka/server/AdminManager.scala Outdated

@dajac dajac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@splett2 Thanks for the PR. It looks pretty good. I have left some comments. I will need to do another pass over it, especially over the tests.

Comment thread clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java Outdated
Comment thread core/src/main/scala/kafka/server/AdminManager.scala
Comment thread core/src/main/scala/kafka/server/AdminManager.scala
Comment thread core/src/main/scala/kafka/admin/ConfigCommand.scala Outdated
Comment thread core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala Outdated

@apovzner apovzner left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Left couple of minor comments.

@dajac dajac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@splett2 Thanks for the updates. I just made another pass and left few more comments.

Comment thread core/src/main/scala/kafka/admin/ConfigCommand.scala Outdated
Comment thread core/src/main/scala/kafka/server/AdminManager.scala
Comment on lines +796 to +797
val ipDefaults = parser.accepts("ip-defaults", "The config defaults for all IPs.")
val ip = parser.accepts("ip", "The IP address.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add these two flags in the KIP as well for completeness.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, along with the change to lower

@splett2 splett2 Dec 8, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. while I was updating the KIP, i noticed that the previous config documentation was inconsistent with existing configs.

Default connection rate quotas for an IP address can be configured by omitting entity name.
bin/kafka-configs  --bootstrap-server localhost:9091 --alter --add-config 'connection_creation_rate=100' --entity-type IPs

This isn't the case for any of the existing configs, you get an error:
an entity-name or default entity must be specified with --alter of users, clients, brokers
so I also updated the KIP for default IP to specify --entity-default.

Comment thread core/src/main/scala/kafka/admin/ConfigCommand.scala Outdated
Comment thread core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
Comment thread core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala Outdated
@splett2

splett2 commented Dec 8, 2020

Copy link
Copy Markdown
Contributor Author

Thanks for the feedback @dajac
I went ahead and also refactored some tests in ConfigCommandTest that were a bit repetitive.
Let me know what you think

@dajac dajac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@splett2 Thanks for the updates. I have left few minor comments.

def validateIpConfig(ip: String, configs: Properties): Unit = {
if (ip != ConfigEntityName.Default && !Utils.validHostPattern(ip))
throw new AdminOperationException(s"IP $ip is not a valid address.")
DynamicConfig.Ip.validateIpOrHost(ip)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little weird to throw InvalidRequestException here because there is not request involved. Users of AdminZkClient may not expect this. I think that it would be better to throw a IllegalArgumentException. Then in the AdminManager#alterClientQuotas, we can catch it and transform it into InvalidRequestException by copying the internal message.

}

if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) {
Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach(DynamicConfig.Ip.validateIpOrHost)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here. It is a bit weird to have an InvalidRequestException thrown here.

Comment on lines +150 to +151
if (!Utils.validHostPattern(ip))
throw new InvalidRequestException(s"$ip is not a valid hostname")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this step is really necessary as InetAddress.getByName also ensures that the hostname is valid. What do you think?

try {
InetAddress.getByName(ip)
} catch {
case _ :UnknownHostException => throw new InvalidRequestException(s"$ip is not a valid IP or resolvable hostname")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _ :UnknownHostException => _: UnknownHostException

@dajac dajac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@splett2 Thanks for the updates. I have left two small suggestions. Besides this, the PR looks good. One last iteration and we can merge it.

Comment on lines +999 to +1003
try {
DynamicConfig.Ip.validateIpOrHost(ip)
} catch {
case e: IllegalArgumentException => throw new InvalidRequestException(e.getMessage)
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For my understanding, have you chosen to do the validation here to ensure that requests with validateOnly = true are fully validated?
  • Initially, I thought that we could just catch the IllegalArgumentException here . Doing the conversion here seems reasonable as well. Alternatively, we could also change validateIpOrHost to return a boolean and let the caller decides which exception to throw.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac
yes, I thought that catching in that block would work, but after taking a closer look, it seems like entity validation is only done when altering, so we wouldn't hit validation in the validateOnly = true case.

I think changing validateIpOrHost to return a boolean is reasonable. I originally wanted to have it throw exception instead of boolean since we had two validation steps (validHostPattern and then resolvable IP), but since we simplified that code block to only do host resolution, it makes more sense to have it just return a boolean.

}

@Test
def testAlterClientQuotasInvalidEntityCombination(): Unit = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these three tests, would it make sense to also verify that the error message? InvalidRequestException are thrown all over the place so we may get one but not for the expected cause. Your last commit suggests that you got caught by this already, is it? 1b0ec2a#diff-f02b619b5cd14e83f7e21dbe211b3f88336f825e1ee5c630fee32b8a0fbe3d20R294

@dajac dajac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@dajac

dajac commented Dec 10, 2020

Copy link
Copy Markdown
Member

Failed test is not related:

  • JDK 11 - kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs

@dajac dajac merged commit 404062d into apache:trunk Dec 10, 2020
@dajac

dajac commented Dec 10, 2020

Copy link
Copy Markdown
Member

@splett2 Thanks for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants