Skip to content

KIP-382: MirrorMaker 2.0#6295

Closed
ryannedolan wants to merge 94 commits intoapache:trunkfrom
ryannedolan:KIP-382
Closed

KIP-382: MirrorMaker 2.0#6295
ryannedolan wants to merge 94 commits intoapache:trunkfrom
ryannedolan:KIP-382

Conversation

@ryannedolan
Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan commented Feb 20, 2019

Implementation of KIP-382 "MirrorMaker 2.0" (approved) and KIP-416 "Notify SourceTask of ACK'd offsets, metadata" (not yet approved).

Depends on #6171

Quick start:

  1. create MM2 configuration file.
  2. launch with ./bin/connect-mirror-maker.sh mm2.properties

Sample configuration file:

    clusters = one, two, three, four
    one.bootstrap.servers = ...
    two.bootstrap.servers = ...
    ...
    one->two.enabled = true # false by default
    two->one.enabled = true
    ...
    three->four.topics = topic1, topic2 # .* by default
    ...
    sync.topic.acls.enabled = false # disable for non-secure clusters

The following features of the KIP are deferred for now:
 - MirrorSinkConnector/Task -- not used by the MirrorMaker driver, but may be useful to run on a Connect cluster
 - "legacy mode" script -- per KIP, not part of first release

@enothereska
Copy link
Copy Markdown
Contributor

  • Checked new topic creation in remote cluster
  • Checked ACL syncing
    The code looks good and ideally we get some tests for the above as well. As part of the above check I'm wondering if it makes sense to add an integration test plan to the KIP (apologies if I missed) since some of this functionality needs full e2e testing.

- only run herders for enabled flows
- don't proactively create topics -- wait for topic to be created by producer
- pluggable filters for topics, groups, config properties
- topic-level metrics and other improvements to metrics
- drop temporary "monitor" connector
.filter(x -> x != -1)
.mapToInt(x -> x)
.min()
.orElse(-1);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering: what is the advantage of using -1 over say null to represent no value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work too. Whatever is more conventional in Kafka is fine with me.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be null some places and other places -1. If possible can we standardize or leave a comment.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Oct 3, 2019

@harshach : There are still a couple of comments that haven't been addressed. The biggest one is on dealing with compacted topic for offset translation.


assertTrue("always emit offset sync on first update",
partitionState.update(0, 100));
assertTrue("upstream offset skipped -> resync",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao holes in a compacted topic are handled, as shown here.

@ryannedolan
Copy link
Copy Markdown
Contributor Author

@junrao sorry, I didn't see a few comments that were folded/hidden for some reason. Hopefully I've addressed everything that would otherwise delay the merge.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryannedolan : Thanks for the explanation. It all makes sense to me now. So, LGTM.

A few of the limitations with MM2 that I saw (1) if MM2 starts with an existing compact topic with lots of holes, there could be more overhead for writing the offsetSync data. In the worse case, every record requires a checkpoint of offsetSync data. (2) if a consumer starts consuming from the beginning, the offsets won't be translated to the target cluster until the consumer catches up. (3) prefix acls are not propagated. These may not be common issues. However, it would be useful document that in the docs.

Also, we will need a few new message formatters to read the new internal topics. Do you plan to add that before code freeze?

@omkreddy and @harshach : Once the minor issues are addressed, perhaps you could take another look and merge the PR?

* @param metadata {@link RecordMetadata} record metadata returned from the broker
* @throws InterruptedException
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments to make it clear that one only needs to implement one of the commitRecord()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple lines locally. Will hold on to the commit for now -- I don't want to trigger another build at the moment.

testRuntime libs.slf4jlog4j
}

javadoc {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryannedolan
We need to include javadocs section for newly added public interfaces/classes.
Example: https://github.com/apache/kafka/blob/trunk/build.gradle#L1539

I assume, we will be adding kafka website documentation as part of KAFKA-8930.
http://kafka.apache.org/documentation/#basic_ops_mirror_maker

Also looks like test failures are related.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks @omkreddy I'll fix the javadocs this morning. For the website documentation, we'll need to keep the existing mirror-maker section for now, but I'll add a section re MM2, probably in a separate PR.

The failing tests seem to be related to flakiness in the Connect integration test framework. I'll see what I can do.

@omkreddy
Copy link
Copy Markdown
Contributor

omkreddy commented Oct 4, 2019

retest this please

@omkreddy
Copy link
Copy Markdown
Contributor

omkreddy commented Oct 5, 2019

@ryannedolan Any update on test failures?

@harshach Please merge the PR once the Jun's comments are addressed and we have green builds.

@ryannedolan
Copy link
Copy Markdown
Contributor Author

@omkreddy I traced the build failures to an NPE from KIP-507 committed yesterday. It is breaking MM2's and other Connect integration tests. I'll fix here I guess.

@ryannedolan
Copy link
Copy Markdown
Contributor Author

I fixed the NPE from KIP-507 -- let's see if we can get a green build now.

@ryannedolan
Copy link
Copy Markdown
Contributor Author

@harshach good to go!

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating KIP-416 so that the framework only calls the newer SourceTask.commitRecord(SourceRecord, RecordMetadata).

I have one request to improve the JavaDoc for the new method to make it easier for developers writing their own SourceTask to know whether to override those methods, without having to dive into the Connect runtime to figure that out.

* </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer
* @param metadata {@link RecordMetadata} record metadata returned from the broker
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryannedolan, we need to clearly specify that metadata parameter can be null, and then in the JavaDoc above specify what this means for the source task, namely that a transform dropped/skipped the record and it was not written to Kafka.

IMO this is necessary so that developers of connector implementations know what the behavior is so they can properly implement their task. (The JavaDoc was not in KIP-416.)

I also think that it's also worth mentioning here that SourceTask implementations need only implement this method or the older commitRecord(SourceRecord) or neither method, but that generally they do not need to implement both since Connect will only call this method. Again, this will help developers that are implementing their own SourceTask what they need to do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhauch I improved the javadocs further. Should be clear now.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Oct 5, 2019

@ryannedolan, @junrao, @harshach: As mentioned above in one of the comments on WorkerSourceTask, this PR also implements KIP-416. I've edited the description of the PR to mention this.

According to the vote thread, KIP-415 has not yet been approved, and per the AK 2.4.0 Release Plan the KIP deadline was on Sept 25.

@junrao, what's required here?

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Oct 5, 2019

@rhauch : We can fold the changes in WorkerSourceTask into this KIP. Ryan, could you update your kip wiki and send an email to the voting thread about the additional changes to make sure that no one objects to this? If kip-416 is completely subsumed by this, we can just cancel it. Otherwise, kip-416 can cover the remaining part.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Oct 5, 2019

Thanks, @junrao. KIP-416 is straightforward, limited to the one new method change we discussed (and agreed to), and required for KIP-382, so pulling those changes into KIP-382 makes sense to me.

*/
public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId,
String remoteClusterAlias, Duration timeout) {
long deadline = System.currentTimeMillis() + timeout.toMillis();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this is not using the Time interface? We generally never use System.currentTimeMillis() in Kafka.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, we should replace these in a subsequent PR.

@harshach
Copy link
Copy Markdown

harshach commented Oct 7, 2019

@junrao some of the comments mentioned here can be handled in a follow-up patch and can be part of minor release that will follow-up.
Given the no.of users that are interested and the number of reviews, we had it in the PR and also maintaining this big of patch as the trunk continues to evolve will be challenging.
If you don't have any major concerns lets merge this in for 2.4 release and address any new comments in follow-up patch. cc @omkreddy

@omkreddy
Copy link
Copy Markdown
Contributor

omkreddy commented Oct 7, 2019

@ryannedolan Thanks for the PR. Merging the PR. Lets address any issues in follow-up PRs. Pls raise JIRAs for any pending work (MirrorSinkConnector, legacy mode etc.) for next releases.

@omkreddy omkreddy closed this in 4ac892c Oct 7, 2019
@ryannedolan ryannedolan deleted the KIP-382 branch December 24, 2019 00:14
blacklisted ones), across all enabled replication flows. Each
replication flow must be explicitly enabled to begin replication:

A->B.enabled = true
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This directive (A->B.enabled) is missing in KIP in the "Running MirrorMaker in production section"-- I got the cluster up but no replication was happening. Until I saw this and fixed this. I am new to MIrrorMaker/Connect Framework- forgive me if this is a well known thing.

@bpux
Copy link
Copy Markdown

bpux commented Mar 25, 2020

@ryannedolan try to find JIRA of MirrorSinkConnector but search return nothing. can you please point me where i can find status of this work? thanks!

@ewencp
Copy link
Copy Markdown
Contributor

ewencp commented Mar 25, 2020

@bpux https://issues.apache.org/jira/browse/KAFKA-7500, the primary JIRA for KIPs are linked from the KIP document itself: https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0

@bpux
Copy link
Copy Markdown

bpux commented Mar 25, 2020

@ewencp thanks! will comment in there

@michael-trelinski
Copy link
Copy Markdown

michael-trelinski commented Apr 28, 2020

Screen Shot 2020-04-21 at 2 17 59 PM

Hi @ryannedolan -

I just got done setting up Mirror Maker V2 where I work. It was successful, but the documentation doesn't match the code in various places.

A couple of things that "got me" -

  • The JMX Metrics in the documentation:

The mbean name for these metrics will be: kafka.mirror.connect:type=MirrorSourceConnect,target=([.\w]+),topic=([.\w]+),partition=([.\d]+) and kafka.mirror.connect:type=MirrorCheckpointConnector,target=([.\w]+),source=([.\w]+),group=([.\w]+)

It should actually be something like (pardon me, I'm using the JMX-prometheus exporter):
kafka.connect.mirror<type=MirrorSourceConnector, target=([\-\w]+), topic=([\-\w]+), partition=(\d+)>. Note how it's actually kafka.connect.mirror not kafka.mirror.connect, also it's MirrorSourceConnector, not MirrorSourceConnect.

  • In the documentation, there is a part about blacklisting groups:

groups.blacklist | empty string | groups to exclude from replication

In the code, it is public static final String GROUPS_BLACKLIST_DEFAULT = "console-consumer-.*, connect-.*, __.*";; here: https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java#L36 - this got me because we're actually using some Kafka Connect stuff for other projects, but we're running Mirror Maker as it's own cluster. Maybe you could blacklist only Mirror Maker's Kafka Connect connect- consumer group?

  • This one is kind of a nit-pick, but in RemoteClusterUtils.translateOffsets, the timeout parameter isn't really a timeout in the conventional sense. Stepping further into the call that's made using timeout, return client.remoteConsumerOffsets(consumerGroupId, remoteClusterAlias, timeout); we can see that it's being used as a deadline, not a timeout. In MirrorClient's remoteConsumerOffsets method, it's being used as a timeout, and as a deadline. Setting this too low caused me to miss some offsets because I didn't understand how it was being used. Setting this absurdly high got me the results that I wanted.

All in all a great product and vastly superior to Mirror Maker V1. Thanks!

@Migueljfs
Copy link
Copy Markdown

I can't seem to find the new metrics mentioned in the KIP.
Like @michael-trelinski said above, I've tried both kafka.connect.mirror / kafka.mirror.connect AND MirrorSourceConnector / MirrorSourceConnect and all combinations of them but still I don't see the mirror domain the respective mbeans.

I installed JMXTERM on my mirror-maker pod and this is what I got:

Domains:

$>domains
#following domains are available
JMImplementation
com.sun.management
java.lang
java.nio
java.util.logging
jdk.management.jfr
kafka.connect
kafka.consumer
kafka.producer

Beans for connect:

#domain = kafka.connect:
kafka.connect:client-id=connect-1,node-id=node--1,type=connect-node-metrics
kafka.connect:client-id=connect-1,node-id=node-7,type=connect-node-metrics
kafka.connect:client-id=connect-1,type=app-info
kafka.connect:client-id=connect-1,type=connect-coordinator-metrics
kafka.connect:client-id=connect-1,type=connect-metrics
kafka.connect:client-id=connect-1,type=kafka-metrics-count
kafka.connect:id="west->central",type=app-info
kafka.connect:id=connect-1,type=app-info
kafka.connect:type=app-info
kafka.connect:type=connect-worker-metrics
kafka.connect:type=connect-worker-rebalance-metrics
kafka.connect:type=kafka-metrics-count
#domain = kafka.consumer:

As you can see, no mentions of "mirror" of any kind anywhere.

Anyone able to help me understand what's going on?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.