Skip to content

Commit 9cbdda1

Browse files
author
Naireen
committed
add in redistribute option for Kafka Read
1 parent de4645d commit 9cbdda1

9 files changed

Lines changed: 302 additions & 19 deletions

File tree

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.beam.sdk.transforms.MapElements;
7676
import org.apache.beam.sdk.transforms.PTransform;
7777
import org.apache.beam.sdk.transforms.ParDo;
78+
import org.apache.beam.sdk.transforms.Redistribute;
7879
import org.apache.beam.sdk.transforms.Reshuffle;
7980
import org.apache.beam.sdk.transforms.SerializableFunction;
8081
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -600,6 +601,9 @@ public static <K, V> Read<K, V> read() {
600601
.setDynamicRead(false)
601602
.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
602603
.setConsumerPollingTimeout(2L)
604+
.setRedistributed(false)
605+
.setAllowDuplicates(false)
606+
.setRedistributeNumKeys(0)
603607
.build();
604608
}
605609

@@ -698,6 +702,15 @@ public abstract static class Read<K, V>
698702
@Pure
699703
public abstract boolean isDynamicRead();
700704

705+
@Pure
706+
public abstract boolean isRedistributed();
707+
708+
@Pure
709+
public abstract boolean isAllowDuplicates();
710+
711+
@Pure
712+
public abstract int getRedistributeNumKeys();
713+
701714
@Pure
702715
public abstract @Nullable Duration getWatchTopicPartitionDuration();
703716

@@ -757,6 +770,12 @@ abstract Builder<K, V> setConsumerFactoryFn(
757770

758771
abstract Builder<K, V> setWatchTopicPartitionDuration(Duration duration);
759772

773+
abstract Builder<K, V> setRedistributed(boolean withRedistribute);
774+
775+
abstract Builder<K, V> setAllowDuplicates(boolean allowDuplicates);
776+
777+
abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
778+
760779
abstract Builder<K, V> setTimestampPolicyFactory(
761780
TimestampPolicyFactory<K, V> timestampPolicyFactory);
762781

@@ -852,6 +871,22 @@ static <K, V> void setupExternalBuilder(
852871
} else {
853872
builder.setConsumerPollingTimeout(2L);
854873
}
874+
875+
if (config.redistribute != null) {
876+
builder.setRedistributed(config.redistribute);
877+
if (config.redistributeNumKeys != null) {
878+
builder.setRedistributeNumKeys((int) config.redistributeNumKeys);
879+
}
880+
if (config.allowDuplicates != null) {
881+
builder.setAllowDuplicates(config.allowDuplicates);
882+
}
883+
884+
} else {
885+
builder.setRedistributed(false);
886+
builder.setRedistributeNumKeys(0);
887+
builder.setAllowDuplicates(false);
888+
}
889+
System.out.println("xxx builder service" + builder.toString());
855890
}
856891

857892
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
@@ -916,6 +951,9 @@ public static class Configuration {
916951
private Boolean commitOffsetInFinalize;
917952
private Long consumerPollingTimeout;
918953
private String timestampPolicy;
954+
private Integer redistributeNumKeys;
955+
private Boolean redistribute;
956+
private Boolean allowDuplicates;
919957

920958
public void setConsumerConfig(Map<String, String> consumerConfig) {
921959
this.consumerConfig = consumerConfig;
@@ -960,6 +998,18 @@ public void setTimestampPolicy(String timestampPolicy) {
960998
public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
961999
this.consumerPollingTimeout = consumerPollingTimeout;
9621000
}
1001+
1002+
public void setRedistributeNumKeys(Integer redistributeNumKeys) {
1003+
this.redistributeNumKeys = redistributeNumKeys;
1004+
}
1005+
1006+
public void setRedistribute(Boolean redistribute) {
1007+
this.redistribute = redistribute;
1008+
}
1009+
1010+
public void setAllowDuplicates(Boolean allowDuplicates) {
1011+
this.allowDuplicates = allowDuplicates;
1012+
}
9631013
}
9641014
}
9651015

@@ -1007,6 +1057,30 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
10071057
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
10081058
}
10091059

1060+
/**
1061+
* Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
1062+
*/
1063+
public Read<K, V> withRedistribute() {
1064+
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
1065+
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
1066+
}
1067+
return toBuilder().setRedistributed(true).build();
1068+
}
1069+
1070+
public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
1071+
if (!isAllowDuplicates()) {
1072+
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
1073+
}
1074+
return toBuilder().setAllowDuplicates(allowDuplicates).build();
1075+
}
1076+
1077+
public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
1078+
checkState(
1079+
isRedistributed(),
1080+
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
1081+
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
1082+
}
1083+
10101084
/**
10111085
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
10121086
* from each of the matching topics are read.
@@ -1618,6 +1692,25 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16181692
.withMaxNumRecords(kafkaRead.getMaxNumRecords());
16191693
}
16201694

1695+
if (kafkaRead.isRedistributed()) {
1696+
// fail here instead.
1697+
checkArgument(
1698+
kafkaRead.isCommitOffsetsInFinalizeEnabled(),
1699+
"commitOffsetsInFinalize() can't be enabled with isRedistributed");
1700+
PCollection<KafkaRecord<K, V>> output = input.getPipeline().apply(transform);
1701+
if (kafkaRead.getRedistributeNumKeys() == 0) {
1702+
return output.apply(
1703+
"Insert Redistribute",
1704+
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1705+
.withAllowDuplicates(kafkaRead.isAllowDuplicates()));
1706+
} else {
1707+
return output.apply(
1708+
"Insert Redistribute with Shards",
1709+
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1710+
.withAllowDuplicates(kafkaRead.isAllowDuplicates())
1711+
.withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
1712+
}
1713+
}
16211714
return input.getPipeline().apply(transform);
16221715
}
16231716
}
@@ -1637,6 +1730,8 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16371730
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
16381731
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
16391732
.withManualWatermarkEstimator()
1733+
.withRedistribute()
1734+
.withAllowDuplicates() // must be set with withRedistribute option.
16401735
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
16411736
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
16421737
.withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout());
@@ -1650,6 +1745,15 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16501745
readTransform =
16511746
readTransform.withBadRecordErrorHandler(kafkaRead.getBadRecordErrorHandler());
16521747
}
1748+
if (kafkaRead.isRedistributed()) {
1749+
readTransform = readTransform.withRedistribute();
1750+
}
1751+
if (kafkaRead.isAllowDuplicates()) {
1752+
readTransform = readTransform.withAllowDuplicates();
1753+
}
1754+
if (kafkaRead.getRedistributeNumKeys() > 0) {
1755+
readTransform = readTransform.withRedistributeNumKeys(kafkaRead.getRedistributeNumKeys());
1756+
}
16531757
PCollection<KafkaSourceDescriptor> output;
16541758
if (kafkaRead.isDynamicRead()) {
16551759
Set<String> topics = new HashSet<>();
@@ -1679,6 +1783,22 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16791783
.apply(Impulse.create())
16801784
.apply(ParDo.of(new GenerateKafkaSourceDescriptor(kafkaRead)));
16811785
}
1786+
if (kafkaRead.isRedistributed()) {
1787+
PCollection<KafkaRecord<K, V>> pcol =
1788+
output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
1789+
if (kafkaRead.getRedistributeNumKeys() == 0) {
1790+
return pcol.apply(
1791+
"Insert Redistribute",
1792+
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1793+
.withAllowDuplicates(kafkaRead.isAllowDuplicates()));
1794+
} else {
1795+
return pcol.apply(
1796+
"Insert Redistribute with Shards",
1797+
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1798+
.withAllowDuplicates(true)
1799+
.withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
1800+
}
1801+
}
16821802
return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
16831803
}
16841804
}
@@ -2070,6 +2190,15 @@ public abstract static class ReadSourceDescriptors<K, V>
20702190
@Pure
20712191
abstract boolean isCommitOffsetEnabled();
20722192

2193+
@Pure
2194+
abstract boolean isRedistribute();
2195+
2196+
@Pure
2197+
abstract boolean isAllowDuplicates();
2198+
2199+
@Pure
2200+
abstract int getRedistributeNumKeys();
2201+
20732202
@Pure
20742203
abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
20752204

@@ -2136,6 +2265,12 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
21362265

21372266
abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);
21382267

2268+
abstract ReadSourceDescriptors.Builder<K, V> setRedistribute(boolean withRedistribute);
2269+
2270+
abstract ReadSourceDescriptors.Builder<K, V> setAllowDuplicates(boolean allowDuplicates);
2271+
2272+
abstract ReadSourceDescriptors.Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
2273+
21392274
abstract ReadSourceDescriptors<K, V> build();
21402275
}
21412276

@@ -2148,6 +2283,9 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
21482283
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
21492284
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
21502285
.setConsumerPollingTimeout(2L)
2286+
.setRedistribute(false)
2287+
.setAllowDuplicates(false)
2288+
.setRedistributeNumKeys(0)
21512289
.build()
21522290
.withProcessingTime()
21532291
.withMonotonicallyIncreasingWatermarkEstimator();
@@ -2307,6 +2445,19 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {
23072445
ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
23082446
}
23092447

2448+
/** Enable Redistribute. */
2449+
public ReadSourceDescriptors<K, V> withRedistribute() {
2450+
return toBuilder().setRedistribute(true).build();
2451+
}
2452+
2453+
public ReadSourceDescriptors<K, V> withAllowDuplicates() {
2454+
return toBuilder().setAllowDuplicates(true).build();
2455+
}
2456+
2457+
public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
2458+
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
2459+
}
2460+
23102461
/** Use the creation time of {@link KafkaRecord} as the output timestamp. */
23112462
public ReadSourceDescriptors<K, V> withCreateTime() {
23122463
return withExtractOutputTimestampFn(
@@ -2497,6 +2648,12 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
24972648
}
24982649
}
24992650

2651+
if (isRedistribute()) {
2652+
if (getRedistributeNumKeys() == 0) {
2653+
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
2654+
}
2655+
}
2656+
25002657
if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
25012658
LOG.warn(
25022659
"The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail.");
@@ -2527,7 +2684,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
25272684
.getSchemaRegistry()
25282685
.getSchemaCoder(KafkaSourceDescriptor.class),
25292686
recordCoder));
2530-
if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
2687+
if (isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute()) {
25312688
outputWithDescriptor =
25322689
outputWithDescriptor
25332690
.apply(Reshuffle.viaRandomKey())
@@ -2538,6 +2695,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
25382695
.getSchemaRegistry()
25392696
.getSchemaCoder(KafkaSourceDescriptor.class),
25402697
recordCoder));
2698+
25412699
PCollection<Void> unused = outputWithDescriptor.apply(new KafkaCommitOffset<K, V>(this));
25422700
unused.setCoder(VoidCoder.of());
25432701
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,24 @@ Object getDefaultValue() {
119119
return Long.valueOf(2);
120120
}
121121
},
122+
REDISTRIBUTE_NUM_KEYS {
123+
@Override
124+
Object getDefaultValue() {
125+
return Integer.valueOf(0);
126+
}
127+
},
128+
REDISTRIBUTED {
129+
@Override
130+
Object getDefaultValue() {
131+
return false;
132+
}
133+
},
134+
ALLOW_DUPLICATES {
135+
@Override
136+
Object getDefaultValue() {
137+
return false;
138+
}
139+
},
122140
;
123141

124142
private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ public void testConstructKafkaRead() throws Exception {
108108
Field.of("start_read_time", FieldType.INT64),
109109
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
110110
Field.of("timestamp_policy", FieldType.STRING),
111-
Field.of("consumer_polling_timeout", FieldType.INT64)))
111+
Field.of("consumer_polling_timeout", FieldType.INT64),
112+
Field.of("redistribute_num_keys", FieldType.INT32),
113+
Field.of("redistribute", FieldType.BOOLEAN),
114+
Field.of("allow_duplicates", FieldType.BOOLEAN)))
112115
.withFieldValue("topics", topics)
113116
.withFieldValue("consumer_config", consumerConfig)
114117
.withFieldValue("key_deserializer", keyDeserializer)
@@ -117,6 +120,9 @@ public void testConstructKafkaRead() throws Exception {
117120
.withFieldValue("commit_offset_in_finalize", false)
118121
.withFieldValue("timestamp_policy", "ProcessingTime")
119122
.withFieldValue("consumer_polling_timeout", 5L)
123+
.withFieldValue("redistribute_num_keys", 0)
124+
.withFieldValue("redistribute", false)
125+
.withFieldValue("allow_duplicates", false)
120126
.build());
121127

122128
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
@@ -139,6 +145,7 @@ public void testConstructKafkaRead() throws Exception {
139145
expansionService.expand(request, observer);
140146
ExpansionApi.ExpansionResponse result = observer.result;
141147
RunnerApi.PTransform transform = result.getTransform();
148+
System.out.println("xxx : " + result.toString());
142149
assertThat(
143150
transform.getSubtransformsList(),
144151
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
@@ -237,14 +244,20 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
237244
Field.of("value_deserializer", FieldType.STRING),
238245
Field.of("start_read_time", FieldType.INT64),
239246
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
240-
Field.of("timestamp_policy", FieldType.STRING)))
247+
Field.of("timestamp_policy", FieldType.STRING),
248+
Field.of("redistribute_num_keys", FieldType.INT32),
249+
Field.of("redistribute", FieldType.BOOLEAN),
250+
Field.of("allow_duplicates", FieldType.BOOLEAN)))
241251
.withFieldValue("topics", topics)
242252
.withFieldValue("consumer_config", consumerConfig)
243253
.withFieldValue("key_deserializer", keyDeserializer)
244254
.withFieldValue("value_deserializer", valueDeserializer)
245255
.withFieldValue("start_read_time", startReadTime)
246256
.withFieldValue("commit_offset_in_finalize", false)
247257
.withFieldValue("timestamp_policy", "ProcessingTime")
258+
.withFieldValue("redistribute_num_keys", 0)
259+
.withFieldValue("redistribute", false)
260+
.withFieldValue("allow_duplicates", false)
248261
.build());
249262

250263
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ public void testPrimitiveKafkaIOReadPropertiesDefaultValueExistence() {
103103

104104
private void testReadTransformCreationWithImplementationBoundProperties(
105105
Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>> kafkaReadDecorator) {
106-
p.apply(kafkaReadDecorator.apply(mkKafkaReadTransform(1000, null, new ValueAsTimestampFn())));
106+
p.apply(
107+
kafkaReadDecorator.apply(
108+
mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false, 0)));
107109
p.run();
108110
}
109111

0 commit comments

Comments
 (0)