Skip to content

Add extra test cases plus some cleanups#1

Closed
hachikuji wants to merge 171 commits into
huxihx:KAFKA-8503from
hachikuji:KAFKA-8503
Closed

Add extra test cases plus some cleanups#1
hachikuji wants to merge 171 commits into
huxihx:KAFKA-8503from
hachikuji:KAFKA-8503

Conversation

@hachikuji

Copy link
Copy Markdown

No description provided.

dongjinleekr and others added 30 commits November 29, 2019 17:17
This PR is a follow-up of apache#7087, fixing typos, styles, etc.

cc/ big-andy-coates ijuma

Author: Lee Dongjin <dongjin@apache.org>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes apache#7217 from dongjinleekr/feature/trivial-admin-javadoc
https://issues.apache.org/jira/browse/KAFKA-9069

Make `getTopicMetadata` in AdminClientIntegrationTest always read metadata from controller to get a consistent view.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

Author: huxihx <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, José Armando García Sancio <jsancio@gmail.com>

Closes apache#7619 from huxihx/KAFKA-9069
Reviewers: Adam Bellemare <adam.bellemare@wishabi.com>, John Roesler <john@confluent.io>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewer: Matthias J. Sax <matthias@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
)

Race condition in concurrent  `get` method invocation of lazy indexes might lead
to multiple `OffsetIndex`/`TimeIndex` objects being concurrently created. When
that happens position of `MappedByteBuffer` in `AbstractIndex` is advanced to
the last entry which in turn leads to a critical `BufferOverflowException` being
thrown whenever leader or replica tries to append a record to the segment.

Moreover, `file_=` setter is seemingly also vulnerable to the race, since multiple
threads can attempt to set a new file reference as well as create new
Time/OffsetIndex objects at the same time. This might lead to the discrepant
File references being held by e.g. LazyTimeIndex and its TimeIndex counterpart.

This patch attempts to fix the issue by making sure that index objects are
atomically constructed when loaded lazily via `get` method. Additionally, `file`
reference modifications are also made atomic and thread safe.

Note that the `Lazy*Index` mutation operations are executed with a lock held by
the callers, but `get` can be called without a lock (e.g. from `Log.read`).

Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>, Shilin Lu, Ismael Juma <ismael@juma.me.uk>
Changes the ProducerMetadata to longer record a sentinel TOPIC_EXPIRY_NEEDS_UPDATE upon topic map emplacement, and instead set the expiry time directly. Previously the expiry time was being updated for all touched topics after a metadata fetch was processed, which could be seconds/minutes in the future.

Additionally propagates the current time further in the Producer, which should reduce the total number of current-time calls.

Reviewers: Ismael Juma <ismael@juma.me.uk>,  Rajini Sivaram <rajinisivaram@googlemail.com>
…ing byte buffers (apache#6679)" (apache#7769)

This reverts commit 90043d5 as it caused a regression in some cases:

> Caused by: java.io.IOException: Stream frame descriptor corrupted
>         at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
>         at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
>         at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)

I will investigate why after, but I want to get the safe fix into 2.4.0.
The reporter of KAFKA-9203 has verified that reverting this change
makes the problem go away.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
…enabled (apache#7748)

Fix three independent causes of threads dying:

1. `ProducerFencedException` isn't properly handed while suspending a task, and leads to the thread dying.
2. `IllegalStateException`: an internal assertion is violated because a store can get orphaned when an exception is thrown during initialization, again leading to the thread dying.
3. `UnknownProducerIdException`: This exception isn't expected by the Streams code, so when we get it, the relevant thread dies. It's not clear whether we always need to catch this, and in the future, we won't expect it at all, but we are catching it now to be sure we're resilient if/when it happens. Important note: this might actually harm performance if the errors turn out to be ignorable, and we will now rebalance instead of ignoring them.

Also, add missing test coverage for the exception handling code.

Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>
Remove nullcheck, and add integration tests for restarting a failed task.

Authors: Cyrus Vafadari <cyrus@confluent.io>, Chris Egerton <chrise@confluent.io>
Reviewers: Arjun Satish <arjun@confluent.io>, Randall Hauch <rhauch@gmail.com>
…or (apache#7744)

Such logic is very brittle. Take the chance to simplify the code a bit.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
)

KAFKA-8448 fixes problem with similar leak. The Log objects are being
held in ScheduledExecutor PeriodicProducerExpirationCheck callback. The
fix in KAFKA-8448 was to change the policy of ScheduledExecutor to
remove the scheduled task when it gets canceled (by calling
setRemoveOnCancelPolicy(true)).

This works when a log is closed using close() method. But when a log is
deleted either when the topic gets deleted or when the rebalancing
operation moves the replica away from broker, the delete() operation is
invoked. Log.delete() doesn't close the pending scheduled task and that
leaks Log instance.

Fix is to close the scheduled task in the Log.delete() method too.

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
…pache#7723)

This patch fixes a bug in `SocketServer` in the expiration of connections which have not re-authenticated quickly enough. Previously these connections were left hanging, but now they are properly closed and cleaned up. This was one cause of the flaky test failures in `EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl`.

Reviewers: Jason Gustafson<jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
…ilure (apache#7682)

This patch fixes an NPE in `DefaultMetadataUpdater` due to an inconsistency in event expectations. Whenever there is an authentication failure, we were treating it as a failed update even if was from a separate connection from an inflight metadata request. This patch fixes the problem by making the `MetadataUpdater` api clearer in terms of the events that are handled.

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
…forever (apache#7763)

If a non-consumer group is specified in `describeConsumerGroup`, the future will hang indefinitely because the future callback is never completed. This patch fixes the problem by completing the future exceptionally with an `IllegalArgumentException`.

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
Reviewed-By: Jason Gustafson <jason@confluent.io>
…bie Connect worker rejoins the group (apache#7771)

Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads.

Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator.

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
This patch eliminates some redundancy and general messiness around the usage of `BaseRequestTest` and specifically response deserialization.

Reviewers: Ismael Juma <ismael@juma.me.uk>
)

Remove in catch clause and move it to the callback.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…gnment state (apache#7786)

This patch fixes a race condition on reassignment completion. The previous code fetched metadata first and then fetched the reassignment state. It is possible in between those times for the reassignment to complete, which leads to spurious URPs being reported. The fix here is to change the order of these checks and to explicitly check for reassignment completion. 

Note this patch fixes the flaky test `TopicCommandWithAdminClientTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
[KAFKA-9267](https://issues.apache.org/jira/browse/KAFKA-9267)

ZkSecurityMigrator might create a PERSISTENT /controller node with null data, it will lead to controller can't elect.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

Author: NanerLee <nanerlee@qq.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes apache#7778 from NanerLee/fix-ZkSecurityMigrator
… a given SSL cipher type (apache#7588)

Reviewers: Tom Bentley <tbentley@redhat.com>, Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
…sts (apache#7789)

Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Roesler <john@confluent.io>
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Ismael Juma <ismael@confluent.io>
…on (apache#7793)

Updates the HTML docs and the javadoc.

Reviewers: John Roesler <vvcephei@apache.org>
…uring reassignment (apache#7795)

KIP-320 improved fetch semantics by adding leader epoch validation. This relies on
reliable propagation of leader epoch information from the controller. Unfortunately, we
have encountered a bug during partition reassignment in which the leader epoch in the
controller context does not get properly updated. This causes UpdateMetadata requests
to be sent with stale epoch information which results in the metadata caches on the
brokers falling out of sync.

This bug has existed for a long time, but it is only a problem due to the new epoch
validation done by the client. Because the client includes the stale leader epoch in its
requests, the leader rejects them, yet the stale metadata cache on the brokers prevents
the consumer from getting the latest epoch. Hence the consumer cannot make progress
while a reassignment is ongoing.

Although it is straightforward to fix this problem in the controller for the new releases
(which this patch does), it is not so easy to fix older brokers which means new clients
could still encounter brokers with this bug. To address this problem, this patch also
modifies the client to treat the leader epoch returned from the Metadata response as
"unreliable" if it comes from an older version of the protocol. The client in this case will
discard the returned epoch and it won't be included in any requests.

Also, note that the correct epoch is still forwarded to replicas correctly in the
LeaderAndIsr request, so this bug does not affect replication.

Reviewers: Jun Rao <junrao@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
soondenana and others added 26 commits January 17, 2020 14:34
…sage (apache#7865)

The `KafkaController::replicasAreValid` method currently returns a
boolean indicating if replicas are valid or not. But the failure
condition loses any context on why replicas are not valid. This change
updates the metod to return the error conition if validation fails. This
allows caller to report the error to the client.

The change also renames the `replicasAreValid` method to
`validateReplicas` to reflect updated semantics.

Reviewers: Sean Li <seanli-rallyhealth@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
)

Since the leader epoch was not maintained in the fetch session cache, no validation would be done except for the initial (full) fetch request. This patch adds the leader epoch to the session cache and addresses the testing gaps.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin Patrick McCabe <cmccabe@apache.org>
…sion method (apache#7954)

Reviewers: Jason Gustafson <jason@confluent.io>
Although APIs section in Kafka documentation lists 5 core APIs (https://kafka.apache.org/documentation/#api), introduction page in Kafka documentation lists 4 of them. I've added the missing list element to fix this incoherence.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
…#7990)

Reviewers: Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
…st.testGracefulClose (apache#7989)

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Author: Narek Karapetian <narek_karapetian@epam.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
…ache#7593)

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
…ockConsumer (apache#7505)

The previous version of MockConsumer does not allow the clients to test consecutive calls to poll while consuming only from a partial set of partitions due to the fact that it clears all the records after each call. This change makes MockConsumer clearing the records only for the partitions that are not paused (whose records are actually returned by the poll). The remaining paused partitions will retain the records.

Unit test added accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…pache#7819)

Author: Nigel Liang <nigel@nigelliang.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
…lugin loader for connector (apache#7964)

Reviewers: Jason Gustafson <jason@confluent.io>
…che#7648)

This commit makes `DistributedHerder` log that some error has happened during task reconfiguration only when it actually has happened.

Author: Ivan Yurchenko <ivan0yurchenko@gmail.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
…unset (apache#7813)

Reviewers: Jason Gustafson <jason@confluent.io>
…#fromConnectData() (apache#7489)

Author: Gunnar Morling <gunnar.morling@googlemail.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
…ache#7952)

This patch adds a new API to the producer to implement transactional offset commit fencing through the group coordinator as proposed in KIP-447. This PR mainly changes on the Producer end for compatible paths to old `sendOffsetsToTxn(offsets, groupId)` vs new `sendOffsetsToTxn(offsets, groupMetadata)`.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
…Id (apache#7920)

Previously the idempotent producer and transactional producer use separate logic when initializing the producerId. This patch consolidates the two paths. We also do some cleanup in `TransactionManagerTest` to eliminate brittle expectations on `Sender`.

Reviewers: Bob Barrett <bob.barrett@confluent.io>, Viktor Somogyi <viktorsomogyi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
…ptions in consumer's SubscriptionState (apache#7941)

Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…metadata (apache#7969)

Also addresses KAFKA-8821

Note that we still have to fall back to using pattern subscription if the user has added any regex-based source nodes to the topology. Includes some minor cleanup on the side

Reviewers: Bill Bejeck <bbejeck@gmail.com>
Author: Ted Yu <yuzhihong@gmail.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
This ticket shall improve two aspects of the retrieval of sensors:
https://issues.apache.org/jira/browse/KAFKA-9152

Currently, when a sensor is retrieved with *Metrics.*Sensor() (e.g. ThreadMetrics.createTaskSensor()) after it was created with the same method *Metrics.*Sensor(), the sensor is added again to the corresponding queue in Sensors (e.g. threadLevelSensors) in StreamsMetricsImpl. Those queues are used to remove the sensors when removeAllLevelSensors() is called. Having multiple times the same sensors in this queue is not an issue from a correctness point of view. However, it would reduce the footprint to only store a sensor once in those queues.

When a sensor is retrieved, the current code attempts to create a new sensor and to add to it again the corresponding metrics. This could be avoided.

Both aspects could be improved by checking whether a sensor already exists by calling getSensor() on the Metrics object and checking the return value.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Also made all shutdown hooks consistent and added tests

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
…change (apache#7870)

Currently, when a dynamic change is made to the broker-level default log configuration, existing log configs will be recreated with an empty overridden configs. In such case, when updating dynamic broker configs a second round, the topic-level configs are lost. This can cause unexpected data loss, for example, if the cleanup policy changes from "compact" to "delete."

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
* lz4: fixes identified by oss-fuzz
* jetty: fixes a few recent regressions
* powermock: better support for Java 12+
* zstd-jni: minor fixes
* httpclient: minor fixes
* spotless-plugin: minor fixes
* jmh: minor fixes

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
@hachikuji

Copy link
Copy Markdown
Author

Closing this. I will open a separate PR upstream.

@hachikuji hachikuji closed this Jan 28, 2020
huxihx pushed a commit that referenced this pull request Mar 22, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.