Skip to content

Commit 25f9120

Browse files
committed
KAFKA-9555: Added default RLMM implementation based on internal topic storage.
Summary: - Introduced TopicBasedRemoteLogMetadataManagerHarness which takes care of bringing up a Kafka cluster and create remote log metadata topic and initializes TopicBasedRemoteLogMetadataManager. - Added TopicBasedRemoteLogMetadataManager to run the existing tests by making the existing RemoteLogMetadataManagerTest parameterized. - Added transformation of TopicIdPartition into bytes with custom hashing. - Skipping the events which are not currently assigned for the ConsumerTask. Added file based remote log metadata snapshots: - Added TopicBasedRemoteLogMetadataManagerRestartTest and fixed issues in reloading the snapshots. - TopicBasedRemoteLogMetadataManagerRestartTest is about verifying: * load the earlier saved snapshots after restart * check the entries are available * start the consumer and add more metadata entries * check the newly addded entries and loaded entries are available - Fixed a few issues in snapshot read/write/load and added tests. This is part of tiered storage KIP-405 efforts. Reviewers: #ldap_kafka_admins, kchandraprakash Reviewed By: #ldap_kafka_admins, kchandraprakash JIRA Issues: DKAFC-868 Differential Revision: https://code.uberinternal.com/D6303375
1 parent 078a2a4 commit 25f9120

36 files changed

Lines changed: 3844 additions & 464 deletions

File tree

build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1447,8 +1447,11 @@ project(':storage') {
14471447

14481448
testCompile project(':clients')
14491449
testCompile project(':clients').sourceSets.test.output
1450+
testCompile project(':core')
1451+
testCompile project(':core').sourceSets.test.output
14501452
testCompile libs.junitJupiter
14511453
testCompile libs.mockitoCore
1454+
testCompile libs.bcpkix
14521455

14531456
testRuntime libs.slf4jlog4j
14541457
}
@@ -1503,6 +1506,12 @@ project(':storage') {
15031506
}
15041507
}
15051508

1509+
test {
1510+
useJUnitPlatform {
1511+
includeEngines 'junit-jupiter'
1512+
}
1513+
}
1514+
15061515
clean.doFirst {
15071516
delete "$buildDir/kafka/"
15081517
}

checkstyle/import-control.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,18 @@
278278

279279
<subpackage name="log">
280280
<allow pkg="com.fasterxml.jackson" />
281+
<allow pkg="kafka.api" />
282+
<allow pkg="kafka.utils" />
283+
<allow pkg="org.apache.kafka.clients" />
281284
<allow pkg="org.apache.kafka.server.common" />
282285
<allow pkg="org.apache.kafka.server.log" />
283286
<allow pkg="org.apache.kafka.test" />
287+
288+
<subpackage name="remote">
289+
<allow pkg="scala.collection" />
290+
<allow pkg="scala.jdk" />
291+
</subpackage>
292+
284293
</subpackage>
285294
</subpackage>
286295

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.server.log.remote.storage;
1818

19+
import org.apache.kafka.common.TopicIdPartition;
1920
import org.apache.kafka.common.annotation.InterfaceStability;
2021

2122
/**
@@ -53,4 +54,9 @@ public long eventTimestampMs() {
5354
public int brokerId() {
5455
return brokerId;
5556
}
57+
58+
/**
59+
* @return TopicIdPartition for which this event is generated.
60+
*/
61+
public abstract TopicIdPartition topicIdPartition();
5662
}

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentId.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.server.log.remote.storage;
1818

1919
import org.apache.kafka.common.TopicIdPartition;
20+
import org.apache.kafka.common.TopicPartition;
2021
import org.apache.kafka.common.Uuid;
2122
import org.apache.kafka.common.annotation.InterfaceStability;
2223

@@ -45,6 +46,10 @@ public TopicIdPartition topicIdPartition() {
4546
return topicIdPartition;
4647
}
4748

49+
public TopicPartition topicPartition() {
50+
return topicIdPartition.topicPartition();
51+
}
52+
4853
/**
4954
* @return Universally Unique Id of this remote log segment.
5055
*/

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.server.log.remote.storage;
1818

19+
import org.apache.kafka.common.TopicIdPartition;
1920
import org.apache.kafka.common.annotation.InterfaceStability;
2021

2122
import java.util.Collections;
@@ -217,6 +218,11 @@ public RemoteLogSegmentMetadata createWithUpdates(RemoteLogSegmentMetadataUpdate
217218
segmentSizeInBytes, rlsmUpdate.state(), segmentLeaderEpochs);
218219
}
219220

221+
@Override
222+
public TopicIdPartition topicIdPartition() {
223+
return remoteLogSegmentId.topicIdPartition();
224+
}
225+
220226
@Override
221227
public boolean equals(Object o) {
222228
if (this == o) {

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.server.log.remote.storage;
1818

19+
import org.apache.kafka.common.TopicIdPartition;
1920
import org.apache.kafka.common.annotation.InterfaceStability;
2021

2122
import java.util.Objects;
@@ -65,6 +66,11 @@ public RemoteLogSegmentState state() {
6566
return state;
6667
}
6768

69+
@Override
70+
public TopicIdPartition topicIdPartition() {
71+
return remoteLogSegmentId.topicIdPartition();
72+
}
73+
6874
@Override
6975
public boolean equals(Object o) {
7076
if (this == o) {
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.log.remote.metadata.storage;
18+
19+
import org.apache.kafka.common.TopicIdPartition;
20+
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
21+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
22+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
23+
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
24+
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
25+
26+
import java.io.IOException;
27+
import java.util.Iterator;
28+
import java.util.Map;
29+
import java.util.Optional;
30+
import java.util.Set;
31+
32+
public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetadataManager {
33+
private final RemoteLogMetadataManager delegate;
34+
private final ClassLoader loader;
35+
36+
public ClassLoaderAwareRemoteLogMetadataManager(RemoteLogMetadataManager delegate,
37+
ClassLoader loader) {
38+
this.delegate = delegate;
39+
this.loader = loader;
40+
}
41+
42+
@Override
43+
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
44+
withClassLoader(() -> {
45+
delegate.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
46+
return null;
47+
});
48+
}
49+
50+
@Override
51+
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
52+
withClassLoader(() -> {
53+
delegate.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
54+
return null;
55+
});
56+
}
57+
58+
@Override
59+
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
60+
int epochForOffset,
61+
long offset) throws RemoteStorageException {
62+
return withClassLoader(() -> delegate.remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset));
63+
}
64+
65+
@Override
66+
public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
67+
int leaderEpoch) throws RemoteStorageException {
68+
return withClassLoader(() -> delegate.highestOffsetForEpoch(topicIdPartition, leaderEpoch));
69+
}
70+
71+
@Override
72+
public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
73+
withClassLoader(() -> {
74+
delegate.putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
75+
return null;
76+
});
77+
}
78+
79+
@Override
80+
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException {
81+
return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition));
82+
}
83+
84+
@Override
85+
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
86+
int leaderEpoch) throws RemoteStorageException {
87+
return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition, leaderEpoch));
88+
}
89+
90+
@Override
91+
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
92+
Set<TopicIdPartition> followerPartitions) {
93+
withTryCatchClassLoader(() -> {
94+
delegate.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
95+
return null;
96+
});
97+
}
98+
99+
@Override
100+
public void onStopPartitions(Set<TopicIdPartition> partitions) {
101+
withTryCatchClassLoader(() -> {
102+
delegate.onStopPartitions(partitions);
103+
return null;
104+
});
105+
}
106+
107+
@Override
108+
public void configure(Map<String, ?> configs) {
109+
withTryCatchClassLoader(() -> {
110+
delegate.configure(configs);
111+
return null;
112+
});
113+
}
114+
115+
@Override
116+
public void close() throws IOException {
117+
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
118+
Thread.currentThread().setContextClassLoader(loader);
119+
try {
120+
delegate.close();
121+
} finally {
122+
Thread.currentThread().setContextClassLoader(originalClassLoader);
123+
}
124+
}
125+
126+
@SuppressWarnings("UnusedReturnValue")
127+
private <T> T withTryCatchClassLoader(Worker<T> worker) {
128+
try {
129+
return withClassLoader(worker);
130+
} catch (final RemoteStorageException ex) {
131+
// ignore, this exception is not thrown by the method.
132+
}
133+
return null;
134+
}
135+
136+
private <T> T withClassLoader(Worker<T> worker) throws RemoteStorageException {
137+
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
138+
Thread.currentThread().setContextClassLoader(loader);
139+
try {
140+
return worker.doWork();
141+
} finally {
142+
Thread.currentThread().setContextClassLoader(originalClassLoader);
143+
}
144+
}
145+
146+
@FunctionalInterface
147+
public interface Worker<T> {
148+
T doWork() throws RemoteStorageException;
149+
}
150+
}

0 commit comments

Comments
 (0)