Skip to content

Kafka 2257#1

Closed
Geoff (granders) wants to merge 23 commits into
KAFKA-2276from
KAFKA-2257
Closed

Kafka 2257#1
Geoff (granders) wants to merge 23 commits into
KAFKA-2276from
KAFKA-2257

Conversation

@granders

Copy link
Copy Markdown

This is a 'fake' pull request created solely for the purpose of getting an initial review on this branch before I do the official pull request against the apache/kafka repo.

Hence, this will not be merged into the KAFKA-2276 branch.

Ewen Cheslack-Postava (@ewencp) Jun Rao (@junrao)
This patch is intended to replace the existing replication test suite, and follows the proposal for replication test suite outlined here: https://cwiki.apache.org/confluence/display/KAFKA/Roadmap+-+port+existing+system+tests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other tools (at least producer performance, possibly others) just allow a list of prop=value arguments. Any reason not to be consistent with those (or whatever the KIP-4 discussion might have ended up agreeing upon)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place I kinda see this is in kafka-topic.sh, but neither KIP-4 nor KIP-14 make reference to allowing a list of prop=value.

Currently kafka-topic.sh doesn't really have this behavior (should it?...):
This works (one --config per config):

kafka_fork $ bin/kafka-topics.sh --topic abc --zookeeper=localhost:2181 --create --partitions 1 --replication-factor 1 --config max.message.bytes=111 --config flush.ms=222

This silently ignores the second config (flush.ms):

kafka_fork $ bin/kafka-topics.sh --topic abc --zookeeper=localhost:2181 --create --partitions 1 --replication-factor 1 --config max.message.bytes=111 flush.ms=222

From a quick scan of https://reviews.apache.org/r/34554/diff/7#3 it looks like this behavior is unchanged in the KIP-4 patches.

That all said, I changed the name to "--config" to be consistent.

@granders

Copy link
Copy Markdown
Author

Done with this local PR

@granders Geoff (granders) deleted the KAFKA-2257 branch July 31, 2015 01:43
ymatsuda added a commit that referenced this pull request Aug 5, 2015
add kafka-clients and log4j to pom.xml
Apurva Mehta (apurvam) pushed a commit that referenced this pull request Apr 3, 2017
This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.

With current trunk there is a possibility to run into this:

```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

```

In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :

```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).

Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).

Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).

Author: Armin Braun <me@obrown.io>
Author: Armin <me@obrown.io>

Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes apache#2791 from original-brownbear/fix-streams-deadlock
Confluent Jenkins Bot (ConfluentJenkins) pushed a commit that referenced this pull request May 6, 2019
…tup control (apache#6638)

This merge consists of two commits previously merged into later branches.
Author: Cyrus Vafadari <cyrus@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>

Commit #1:
MINOR: Add async and different sync startup modes in connect service test class

Allow Connect Service in system tests to start asynchronously.

Specifically, allow for three startup conditions:
1. No condition - start async and return immediately.
2. Semi-async - start immediately after plugins have been discovered successfully.
3. Sync - start returns after the worker has completed startup. This is the current mode, but its condition is improved by checking that the port of Connect's REST interface is open, rather than that a log line has appeared in the logs.

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes apache#4423 from kkonstantine/MINOR-Add-async-and-different-sync-startup-modes-in-ConnectService-test-class

Commit #2:
MINOR: Modify Connect service's startup timeout to be passed via the init (apache#5882)

Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init.

Author: Magesh Nandakumar <mageshn@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Confluent Jenkins Bot (ConfluentJenkins) pushed a commit that referenced this pull request Oct 17, 2019
…pache#7305)

A partition log in initialized in following steps:

1. Fetch log config from ZK
2. Call LogManager.getOrCreateLog which creates the Log object, then
3. Registers the Log object

Step #3 enables Configuration update thread to deliver configuration
updates to the log. But if any update arrives between step #1 and #3
then that update is missed. It breaks following use case:

1. Create a topic with default configuration, and immediately after that
2. Update the configuration of topic

There is a race condition here and in random cases update made in
second step will get dropped.

This change fixes it by tracking updates arriving between step #1 and #3
Once a Partition is done initializing log, it checks if it has missed any
update. If yes, then the configuration is read from ZK again.

Added unit tests to make sure a dirty configuration is refreshed. Tested
on local cluster to make sure that topic configuration and updates are
handled correctly.

Reviewers: Jason Gustafson <jason@confluent.io>
David Arthur (mumrah) pushed a commit that referenced this pull request Oct 29, 2019
…pache#7305)

A partition log in initialized in following steps:

1. Fetch log config from ZK
2. Call LogManager.getOrCreateLog which creates the Log object, then
3. Registers the Log object

Step #3 enables Configuration update thread to deliver configuration
updates to the log. But if any update arrives between step #1 and #3
then that update is missed. It breaks following use case:

1. Create a topic with default configuration, and immediately after that
2. Update the configuration of topic

There is a race condition here and in random cases update made in
second step will get dropped.

This change fixes it by tracking updates arriving between step #1 and #3
Once a Partition is done initializing log, it checks if it has missed any
update. If yes, then the configuration is read from ZK again.

Added unit tests to make sure a dirty configuration is refreshed. Tested
on local cluster to make sure that topic configuration and updates are
handled correctly.

Reviewers: Jason Gustafson <jason@confluent.io>
Vikas Singh (soondenana) added a commit that referenced this pull request Feb 11, 2020
Revert "KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429)"
Confluent Jenkins Bot (ConfluentJenkins) pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Confluent Jenkins Bot (ConfluentJenkins) pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Rajini Sivaram (rajinisivaram) pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Confluent Jenkins Bot (ConfluentJenkins) pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Confluent Jenkins Bot (ConfluentJenkins) pushed a commit that referenced this pull request Feb 16, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Andrew Egelhofer (andrewegel) pushed a commit that referenced this pull request Feb 20, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Badai Aqrandista (badaiaqrandista) pushed a commit that referenced this pull request Sep 18, 2023
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
Confluent Semaphore (ConfluentSemaphore) pushed a commit that referenced this pull request Nov 10, 2024
… for aborted txns (apache#17676) (#1 7733)

Reviewers: Jun Rao <junrao@gmail.com>
airlock-confluentinc Bot pushed a commit that referenced this pull request Feb 12, 2026
…ker provisioning (apache#21394)

## Summary

Fixes bugs where `--jdk-version` and `--jdk-arch` parameters were
ignored during system test worker provisioning, and refactors
`vagrant/base.sh` to support flexible JDK versions without code changes.

---

## Problem

The Vagrant provisioning script (`vagrant/base.sh`) had two bugs that
caused JDK version parameters to be ignored:

| Bug | Problem |
|-----|---------|
| **#1: `--jdk-version` ignored** | `JDK_FULL` was hardcoded to
`17-linux-x64`, so passing `--jdk-version 25` still downloaded JDK 17 |
| **#2: `--jdk-arch` ignored** | Architecture parameter was passed but
never used in the S3 download URL |

---

## Solution

- Validate `JDK_MAJOR` and `JDK_ARCH` input parameters with regex
- Dynamically construct `JDK_FULL` from `JDK_MAJOR` and `JDK_ARCH`
- Update S3 path to use `/jdk/` subdirectory
- Add logging for debugging

---

## Changes

### `vagrant/base.sh`

| Change | Description |
|--------|-------------|
| **Input validation** | Added regex validation for `JDK_MAJOR` and
`JDK_ARCH` with sensible defaults |
| **Dynamic construction** | `JDK_FULL` is now constructed from
`JDK_MAJOR` and `JDK_ARCH` if not explicitly provided |
| **Updated S3 path** | Changed URL from
`/kafka-packages/jdk-{version}.tar.gz` to
`/kafka-packages/jdk/jdk-{version}.tar.gz` |
| **Logging** | Added debug output for JDK configuration |
| **Backward compatibility** | Vagrantfile can still pass `JDK_FULL`
directly; the script validates and uses it if valid |

---

## S3 Path Change

### Old Path
```
s3://kafka-packages/jdk-{version}.tar.gz
```

### New Path
```
s3://kafka-packages/jdk/jdk-{version}.tar.gz
```

### Available JDKs in `s3://kafka-packages/jdk/`

| File | Version | Architecture |
|------|---------|--------------|
| `jdk-7u80-linux-x64.tar.gz` | 7u80 | x64 |
| `jdk-8u144-linux-x64.tar.gz` | 8u144 | x64 |
| `jdk-8u161-linux-x64.tar.gz` | 8u161 | x64 |
| `jdk-8u171-linux-x64.tar.gz` | 8u171 | x64 |
| `jdk-8u191-linux-x64.tar.gz` | 8u191 | x64 |
| `jdk-8u202-linux-x64.tar.gz` | 8u202 | x64 |
| `jdk-11.0.2-linux-x64.tar.gz` | 11.0.2 | x64 |
| `jdk-17-linux-x64.tar.gz` | 17 | x64 |
| `jdk-18.0.2-linux-x64.tar.gz` | 18.0.2 | x64 |
| `jdk-21.0.1-linux-x64.tar.gz` | 21.0.1 | x64 |
| `jdk-21.0.1-linux-aarch64.tar.gz` | 21.0.1 | aarch64 |
| `jdk-25-linux-x64.tar.gz` | 25 | x64 |
| `jdk-25-linux-aarch64.tar.gz` | 25 | aarch64 |
| `jdk-25.0.1-linux-x64.tar.gz` | 25.0.1 | x64 |
| `jdk-25.0.1-linux-aarch64.tar.gz` | 25.0.1 | aarch64 |
| `jdk-25.0.2-linux-x64.tar.gz` | 25.0.2 | x64 |
| `jdk-25.0.2-linux-aarch64.tar.gz` | 25.0.2 | aarch64 |

---

## Future JDK Releases

> **IMPORTANT: No code changes required for future Java major/minor
releases!**

The validation regex supports all version formats:
- **Major versions**: `17`, `25`, `26`
- **Minor versions**: `25.0.1`, `25.0.2`, `26.0.1`
- **Legacy format**: `8u144`, `8u202`

### Adding New JDK Versions

To add support for a new JDK version (e.g., JDK 26, 25.0.3):

1. Download the JDK tarball from Oracle/Adoptium
2. Rename to follow naming convention:
`jdk-{VERSION}-linux-{ARCH}.tar.gz`
3. Upload to S3: `aws s3 cp jdk-{VERSION}-linux-{ARCH}.tar.gz
s3://kafka-packages/jdk/`
4. Use in tests: `--jdk-version {VERSION} --jdk-arch {ARCH}`

No modifications to `base.sh` or any other scripts are needed.

---

## Benefits

| Before | After |
|--------|-------|
| `--jdk-version` ignored | ✅ Correctly uses specified version |
| `--jdk-arch` ignored | ✅ Correctly uses specified architecture |
| Only major version support | ✅ Full version support (e.g., `25.0.2`) |
| Code change needed for new JDK | ✅ Just upload to S3 and pass version
|

---

## Testing

Tested with different JDK versions to confirm the fix works correctly:

| Test | JDK_MAJOR | Expected | Actual | Result | Test Report |
|------|-----------|----------|--------|--------|-------------|
| JDK 17 | `17` | javac 17.0.4 | javac 17.0.4 | ✅ |
| JDK 25 | `25` | javac 25.0.2 | javac 25.0.2 | ✅ |


---

## Backward Compatibility

- **Vagrantfile**: Continues to work as before
- **Existing workflows**: Default behavior unchanged (JDK 17 on x64
architecture)
- **No breaking changes**: All existing configurations continue to work

---

 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
airlock-confluentinc Bot pushed a commit that referenced this pull request Feb 12, 2026
…ker provisioning (apache#21394)

## Summary

Fixes bugs where `--jdk-version` and `--jdk-arch` parameters were
ignored during system test worker provisioning, and refactors
`vagrant/base.sh` to support flexible JDK versions without code changes.

---

## Problem

The Vagrant provisioning script (`vagrant/base.sh`) had two bugs that
caused JDK version parameters to be ignored:

| Bug | Problem |
|-----|---------|
| **#1: `--jdk-version` ignored** | `JDK_FULL` was hardcoded to
`17-linux-x64`, so passing `--jdk-version 25` still downloaded JDK 17 |
| **#2: `--jdk-arch` ignored** | Architecture parameter was passed but
never used in the S3 download URL |

---

## Solution

- Validate `JDK_MAJOR` and `JDK_ARCH` input parameters with regex
- Dynamically construct `JDK_FULL` from `JDK_MAJOR` and `JDK_ARCH`
- Update S3 path to use `/jdk/` subdirectory
- Add logging for debugging

---

## Changes

### `vagrant/base.sh`

| Change | Description |
|--------|-------------|
| **Input validation** | Added regex validation for `JDK_MAJOR` and
`JDK_ARCH` with sensible defaults |
| **Dynamic construction** | `JDK_FULL` is now constructed from
`JDK_MAJOR` and `JDK_ARCH` if not explicitly provided |
| **Updated S3 path** | Changed URL from
`/kafka-packages/jdk-{version}.tar.gz` to
`/kafka-packages/jdk/jdk-{version}.tar.gz` |
| **Logging** | Added debug output for JDK configuration |
| **Backward compatibility** | Vagrantfile can still pass `JDK_FULL`
directly; the script validates and uses it if valid |

---

## S3 Path Change

### Old Path
```
s3://kafka-packages/jdk-{version}.tar.gz
```

### New Path
```
s3://kafka-packages/jdk/jdk-{version}.tar.gz
```

### Available JDKs in `s3://kafka-packages/jdk/`

| File | Version | Architecture |
|------|---------|--------------|
| `jdk-7u80-linux-x64.tar.gz` | 7u80 | x64 |
| `jdk-8u144-linux-x64.tar.gz` | 8u144 | x64 |
| `jdk-8u161-linux-x64.tar.gz` | 8u161 | x64 |
| `jdk-8u171-linux-x64.tar.gz` | 8u171 | x64 |
| `jdk-8u191-linux-x64.tar.gz` | 8u191 | x64 |
| `jdk-8u202-linux-x64.tar.gz` | 8u202 | x64 |
| `jdk-11.0.2-linux-x64.tar.gz` | 11.0.2 | x64 |
| `jdk-17-linux-x64.tar.gz` | 17 | x64 |
| `jdk-18.0.2-linux-x64.tar.gz` | 18.0.2 | x64 |
| `jdk-21.0.1-linux-x64.tar.gz` | 21.0.1 | x64 |
| `jdk-21.0.1-linux-aarch64.tar.gz` | 21.0.1 | aarch64 |
| `jdk-25-linux-x64.tar.gz` | 25 | x64 |
| `jdk-25-linux-aarch64.tar.gz` | 25 | aarch64 |
| `jdk-25.0.1-linux-x64.tar.gz` | 25.0.1 | x64 |
| `jdk-25.0.1-linux-aarch64.tar.gz` | 25.0.1 | aarch64 |
| `jdk-25.0.2-linux-x64.tar.gz` | 25.0.2 | x64 |
| `jdk-25.0.2-linux-aarch64.tar.gz` | 25.0.2 | aarch64 |

---

## Future JDK Releases

> **IMPORTANT: No code changes required for future Java major/minor
releases!**

The validation regex supports all version formats:
- **Major versions**: `17`, `25`, `26`
- **Minor versions**: `25.0.1`, `25.0.2`, `26.0.1`
- **Legacy format**: `8u144`, `8u202`

### Adding New JDK Versions

To add support for a new JDK version (e.g., JDK 26, 25.0.3):

1. Download the JDK tarball from Oracle/Adoptium
2. Rename to follow naming convention:
`jdk-{VERSION}-linux-{ARCH}.tar.gz`
3. Upload to S3: `aws s3 cp jdk-{VERSION}-linux-{ARCH}.tar.gz
s3://kafka-packages/jdk/`
4. Use in tests: `--jdk-version {VERSION} --jdk-arch {ARCH}`

No modifications to `base.sh` or any other scripts are needed.

---

## Benefits

| Before | After |
|--------|-------|
| `--jdk-version` ignored | ✅ Correctly uses specified version |
| `--jdk-arch` ignored | ✅ Correctly uses specified architecture |
| Only major version support | ✅ Full version support (e.g., `25.0.2`) |
| Code change needed for new JDK | ✅ Just upload to S3 and pass version
|

---

## Testing

Tested with different JDK versions to confirm the fix works correctly:

| Test | JDK_MAJOR | Expected | Actual | Result | Test Report |
|------|-----------|----------|--------|--------|-------------|
| JDK 17 | `17` | javac 17.0.4 | javac 17.0.4 | ✅ |
| JDK 25 | `25` | javac 25.0.2 | javac 25.0.2 | ✅ |


---

## Backward Compatibility

- **Vagrantfile**: Continues to work as before
- **Existing workflows**: Default behavior unchanged (JDK 17 on x64
architecture)
- **No breaking changes**: All existing configurations continue to work

---

 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
airlock-confluentinc Bot pushed a commit that referenced this pull request Feb 13, 2026
…ker provisioning (apache#21394)

Fixes bugs where `--jdk-version` and `--jdk-arch` parameters were
ignored during system test worker provisioning, and refactors
`vagrant/base.sh` to support flexible JDK versions without code changes.

---

The Vagrant provisioning script (`vagrant/base.sh`) had two bugs that
caused JDK version parameters to be ignored:

| Bug | Problem |
|-----|---------|
| **#1: `--jdk-version` ignored** | `JDK_FULL` was hardcoded to
`17-linux-x64`, so passing `--jdk-version 25` still downloaded JDK 17 |
| **#2: `--jdk-arch` ignored** | Architecture parameter was passed but
never used in the S3 download URL |

---

- Validate `JDK_MAJOR` and `JDK_ARCH` input parameters with regex
- Dynamically construct `JDK_FULL` from `JDK_MAJOR` and `JDK_ARCH`
- Update S3 path to use `/jdk/` subdirectory
- Add logging for debugging

---

| Change | Description |
|--------|-------------|
| **Input validation** | Added regex validation for `JDK_MAJOR` and
`JDK_ARCH` with sensible defaults |
| **Dynamic construction** | `JDK_FULL` is now constructed from
`JDK_MAJOR` and `JDK_ARCH` if not explicitly provided |
| **Updated S3 path** | Changed URL from
`/kafka-packages/jdk-{version}.tar.gz` to
`/kafka-packages/jdk/jdk-{version}.tar.gz` |
| **Logging** | Added debug output for JDK configuration |
| **Backward compatibility** | Vagrantfile can still pass `JDK_FULL`
directly; the script validates and uses it if valid |

---

```
s3://kafka-packages/jdk-{version}.tar.gz
```

```
s3://kafka-packages/jdk/jdk-{version}.tar.gz
```

| File | Version | Architecture |
|------|---------|--------------|
| `jdk-7u80-linux-x64.tar.gz` | 7u80 | x64 |
| `jdk-8u144-linux-x64.tar.gz` | 8u144 | x64 |
| `jdk-8u161-linux-x64.tar.gz` | 8u161 | x64 |
| `jdk-8u171-linux-x64.tar.gz` | 8u171 | x64 |
| `jdk-8u191-linux-x64.tar.gz` | 8u191 | x64 |
| `jdk-8u202-linux-x64.tar.gz` | 8u202 | x64 |
| `jdk-11.0.2-linux-x64.tar.gz` | 11.0.2 | x64 |
| `jdk-17-linux-x64.tar.gz` | 17 | x64 |
| `jdk-18.0.2-linux-x64.tar.gz` | 18.0.2 | x64 |
| `jdk-21.0.1-linux-x64.tar.gz` | 21.0.1 | x64 |
| `jdk-21.0.1-linux-aarch64.tar.gz` | 21.0.1 | aarch64 |
| `jdk-25-linux-x64.tar.gz` | 25 | x64 |
| `jdk-25-linux-aarch64.tar.gz` | 25 | aarch64 |
| `jdk-25.0.1-linux-x64.tar.gz` | 25.0.1 | x64 |
| `jdk-25.0.1-linux-aarch64.tar.gz` | 25.0.1 | aarch64 |
| `jdk-25.0.2-linux-x64.tar.gz` | 25.0.2 | x64 |
| `jdk-25.0.2-linux-aarch64.tar.gz` | 25.0.2 | aarch64 |

---

> **IMPORTANT: No code changes required for future Java major/minor
releases!**

The validation regex supports all version formats:
- **Major versions**: `17`, `25`, `26`
- **Minor versions**: `25.0.1`, `25.0.2`, `26.0.1`
- **Legacy format**: `8u144`, `8u202`

To add support for a new JDK version (e.g., JDK 26, 25.0.3):

1. Download the JDK tarball from Oracle/Adoptium
2. Rename to follow naming convention:
`jdk-{VERSION}-linux-{ARCH}.tar.gz`
3. Upload to S3: `aws s3 cp jdk-{VERSION}-linux-{ARCH}.tar.gz
s3://kafka-packages/jdk/`
4. Use in tests: `--jdk-version {VERSION} --jdk-arch {ARCH}`

No modifications to `base.sh` or any other scripts are needed.

---

| Before | After |
|--------|-------|
| `--jdk-version` ignored | ✅ Correctly uses specified version |
| `--jdk-arch` ignored | ✅ Correctly uses specified architecture |
| Only major version support | ✅ Full version support (e.g., `25.0.2`) |
| Code change needed for new JDK | ✅ Just upload to S3 and pass version
|

---

Tested with different JDK versions to confirm the fix works correctly:

| Test | JDK_MAJOR | Expected | Actual | Result | Test Report |
|------|-----------|----------|--------|--------|-------------|
| JDK 17 | `17` | javac 17.0.4 | javac 17.0.4 | ✅ |
| JDK 25 | `25` | javac 25.0.2 | javac 25.0.2 | ✅ |

---

- **Vagrantfile**: Continues to work as before
- **Existing workflows**: Default behavior unchanged (JDK 17 on x64
architecture)
- **No breaking changes**: All existing configurations continue to work

---

 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
airlock-confluentinc Bot pushed a commit that referenced this pull request Feb 19, 2026
…ker provisioning (apache#21394)

## Summary

Fixes bugs where `--jdk-version` and `--jdk-arch` parameters were
ignored during system test worker provisioning, and refactors
`vagrant/base.sh` to support flexible JDK versions without code changes.

---

## Problem

The Vagrant provisioning script (`vagrant/base.sh`) had two bugs that
caused JDK version parameters to be ignored:

| Bug | Problem |
|-----|---------|
| **#1: `--jdk-version` ignored** | `JDK_FULL` was hardcoded to
`17-linux-x64`, so passing `--jdk-version 25` still downloaded JDK 17 |
| **#2: `--jdk-arch` ignored** | Architecture parameter was passed but
never used in the S3 download URL |

---

## Solution

- Validate `JDK_MAJOR` and `JDK_ARCH` input parameters with regex
- Dynamically construct `JDK_FULL` from `JDK_MAJOR` and `JDK_ARCH`
- Update S3 path to use `/jdk/` subdirectory
- Add logging for debugging

---

## Changes

### `vagrant/base.sh`

| Change | Description |
|--------|-------------|
| **Input validation** | Added regex validation for `JDK_MAJOR` and
`JDK_ARCH` with sensible defaults |
| **Dynamic construction** | `JDK_FULL` is now constructed from
`JDK_MAJOR` and `JDK_ARCH` if not explicitly provided |
| **Updated S3 path** | Changed URL from
`/kafka-packages/jdk-{version}.tar.gz` to
`/kafka-packages/jdk/jdk-{version}.tar.gz` |
| **Logging** | Added debug output for JDK configuration |
| **Backward compatibility** | Vagrantfile can still pass `JDK_FULL`
directly; the script validates and uses it if valid |

---

## S3 Path Change

### Old Path
```
s3://kafka-packages/jdk-{version}.tar.gz
```

### New Path
```
s3://kafka-packages/jdk/jdk-{version}.tar.gz
```

### Available JDKs in `s3://kafka-packages/jdk/`

| File | Version | Architecture |
|------|---------|--------------|
| `jdk-7u80-linux-x64.tar.gz` | 7u80 | x64 |
| `jdk-8u144-linux-x64.tar.gz` | 8u144 | x64 |
| `jdk-8u161-linux-x64.tar.gz` | 8u161 | x64 |
| `jdk-8u171-linux-x64.tar.gz` | 8u171 | x64 |
| `jdk-8u191-linux-x64.tar.gz` | 8u191 | x64 |
| `jdk-8u202-linux-x64.tar.gz` | 8u202 | x64 |
| `jdk-11.0.2-linux-x64.tar.gz` | 11.0.2 | x64 |
| `jdk-17-linux-x64.tar.gz` | 17 | x64 |
| `jdk-18.0.2-linux-x64.tar.gz` | 18.0.2 | x64 |
| `jdk-21.0.1-linux-x64.tar.gz` | 21.0.1 | x64 |
| `jdk-21.0.1-linux-aarch64.tar.gz` | 21.0.1 | aarch64 |
| `jdk-25-linux-x64.tar.gz` | 25 | x64 |
| `jdk-25-linux-aarch64.tar.gz` | 25 | aarch64 |
| `jdk-25.0.1-linux-x64.tar.gz` | 25.0.1 | x64 |
| `jdk-25.0.1-linux-aarch64.tar.gz` | 25.0.1 | aarch64 |
| `jdk-25.0.2-linux-x64.tar.gz` | 25.0.2 | x64 |
| `jdk-25.0.2-linux-aarch64.tar.gz` | 25.0.2 | aarch64 |

---

## Future JDK Releases

> **IMPORTANT: No code changes required for future Java major/minor
releases!**

The validation regex supports all version formats:
- **Major versions**: `17`, `25`, `26`
- **Minor versions**: `25.0.1`, `25.0.2`, `26.0.1`
- **Legacy format**: `8u144`, `8u202`

### Adding New JDK Versions

To add support for a new JDK version (e.g., JDK 26, 25.0.3):

1. Download the JDK tarball from Oracle/Adoptium
2. Rename to follow naming convention:
`jdk-{VERSION}-linux-{ARCH}.tar.gz`
3. Upload to S3: `aws s3 cp jdk-{VERSION}-linux-{ARCH}.tar.gz
s3://kafka-packages/jdk/`
4. Use in tests: `--jdk-version {VERSION} --jdk-arch {ARCH}`

No modifications to `base.sh` or any other scripts are needed.

---

## Benefits

| Before | After |
|--------|-------|
| `--jdk-version` ignored | ✅ Correctly uses specified version |
| `--jdk-arch` ignored | ✅ Correctly uses specified architecture |
| Only major version support | ✅ Full version support (e.g., `25.0.2`) |
| Code change needed for new JDK | ✅ Just upload to S3 and pass version
|

---

## Testing

Tested with different JDK versions to confirm the fix works correctly:

| Test | JDK_MAJOR | Expected | Actual | Result | Test Report |
|------|-----------|----------|--------|--------|-------------|
| JDK 17 | `17` | javac 17.0.4 | javac 17.0.4 | ✅ |
| JDK 25 | `25` | javac 25.0.2 | javac 25.0.2 | ✅ |


---

## Backward Compatibility

- **Vagrantfile**: Continues to work as before
- **Existing workflows**: Default behavior unchanged (JDK 17 on x64
architecture)
- **No breaking changes**: All existing configurations continue to work

---

 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants