7575import org .apache .beam .sdk .transforms .MapElements ;
7676import org .apache .beam .sdk .transforms .PTransform ;
7777import org .apache .beam .sdk .transforms .ParDo ;
78+ import org .apache .beam .sdk .transforms .Redistribute ;
7879import org .apache .beam .sdk .transforms .Reshuffle ;
7980import org .apache .beam .sdk .transforms .SerializableFunction ;
8081import org .apache .beam .sdk .transforms .SimpleFunction ;
@@ -600,6 +601,9 @@ public static <K, V> Read<K, V> read() {
600601 .setDynamicRead (false )
601602 .setTimestampPolicyFactory (TimestampPolicyFactory .withProcessingTime ())
602603 .setConsumerPollingTimeout (2L )
604+ .setRedistributed (false )
605+ .setAllowDuplicates (false )
606+ .setRedistributeNumKeys (0 )
603607 .build ();
604608 }
605609
@@ -698,6 +702,15 @@ public abstract static class Read<K, V>
698702 @ Pure
699703 public abstract boolean isDynamicRead ();
700704
705+ @ Pure
706+ public abstract boolean isRedistributed ();
707+
708+ @ Pure
709+ public abstract boolean isAllowDuplicates ();
710+
711+ @ Pure
712+ public abstract int getRedistributeNumKeys ();
713+
701714 @ Pure
702715 public abstract @ Nullable Duration getWatchTopicPartitionDuration ();
703716
@@ -757,6 +770,12 @@ abstract Builder<K, V> setConsumerFactoryFn(
757770
758771 abstract Builder <K , V > setWatchTopicPartitionDuration (Duration duration );
759772
773+ abstract Builder <K , V > setRedistributed (boolean withRedistribute );
774+
775+ abstract Builder <K , V > setAllowDuplicates (boolean allowDuplicates );
776+
777+ abstract Builder <K , V > setRedistributeNumKeys (int redistributeNumKeys );
778+
760779 abstract Builder <K , V > setTimestampPolicyFactory (
761780 TimestampPolicyFactory <K , V > timestampPolicyFactory );
762781
@@ -852,6 +871,22 @@ static <K, V> void setupExternalBuilder(
852871 } else {
853872 builder .setConsumerPollingTimeout (2L );
854873 }
874+
875+ if (config .redistribute != null ) {
876+ builder .setRedistributed (config .redistribute );
877+ if (config .redistributeNumKeys != null ) {
878+ builder .setRedistributeNumKeys ((int ) config .redistributeNumKeys );
879+ }
880+ if (config .allowDuplicates != null ) {
881+ builder .setAllowDuplicates (config .allowDuplicates );
882+ }
883+
884+ } else {
885+ builder .setRedistributed (false );
886+ builder .setRedistributeNumKeys (0 );
887+ builder .setAllowDuplicates (false );
888+ }
889+ System .out .println ("xxx builder service" + builder .toString ());
855890 }
856891
857892 private static <T > Coder <T > resolveCoder (Class <Deserializer <T >> deserializer ) {
@@ -916,6 +951,9 @@ public static class Configuration {
916951 private Boolean commitOffsetInFinalize ;
917952 private Long consumerPollingTimeout ;
918953 private String timestampPolicy ;
954+ private Integer redistributeNumKeys ;
955+ private Boolean redistribute ;
956+ private Boolean allowDuplicates ;
919957
920958 public void setConsumerConfig (Map <String , String > consumerConfig ) {
921959 this .consumerConfig = consumerConfig ;
@@ -960,6 +998,18 @@ public void setTimestampPolicy(String timestampPolicy) {
960998 public void setConsumerPollingTimeout (Long consumerPollingTimeout ) {
961999 this .consumerPollingTimeout = consumerPollingTimeout ;
9621000 }
1001+
1002+ public void setRedistributeNumKeys (Integer redistributeNumKeys ) {
1003+ this .redistributeNumKeys = redistributeNumKeys ;
1004+ }
1005+
1006+ public void setRedistribute (Boolean redistribute ) {
1007+ this .redistribute = redistribute ;
1008+ }
1009+
1010+ public void setAllowDuplicates (Boolean allowDuplicates ) {
1011+ this .allowDuplicates = allowDuplicates ;
1012+ }
9631013 }
9641014 }
9651015
@@ -1007,6 +1057,30 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
10071057 return toBuilder ().setTopicPartitions (ImmutableList .copyOf (topicPartitions )).build ();
10081058 }
10091059
1060+ /**
1061+ * Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
1062+ */
1063+ public Read <K , V > withRedistribute () {
1064+ if (getRedistributeNumKeys () == 0 && isRedistributed ()) {
1065+ LOG .warn ("This will create a key per record, which is sub-optimal for most use cases." );
1066+ }
1067+ return toBuilder ().setRedistributed (true ).build ();
1068+ }
1069+
1070+ public Read <K , V > withAllowDuplicates (Boolean allowDuplicates ) {
1071+ if (!isAllowDuplicates ()) {
1072+ LOG .warn ("Setting this value without setting withRedistribute() will have no effect." );
1073+ }
1074+ return toBuilder ().setAllowDuplicates (allowDuplicates ).build ();
1075+ }
1076+
1077+ public Read <K , V > withRedistributeNumKeys (int redistributeNumKeys ) {
1078+ checkState (
1079+ isRedistributed (),
1080+ "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform." );
1081+ return toBuilder ().setRedistributeNumKeys (redistributeNumKeys ).build ();
1082+ }
1083+
10101084 /**
10111085 * Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
10121086 * from each of the matching topics are read.
@@ -1618,6 +1692,25 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16181692 .withMaxNumRecords (kafkaRead .getMaxNumRecords ());
16191693 }
16201694
1695+ if (kafkaRead .isRedistributed ()) {
1696+ // fail here instead.
1697+ checkArgument (
1698+ kafkaRead .isCommitOffsetsInFinalizeEnabled (),
1699+ "commitOffsetsInFinalize() can't be enabled with isRedistributed" );
1700+ PCollection <KafkaRecord <K , V >> output = input .getPipeline ().apply (transform );
1701+ if (kafkaRead .getRedistributeNumKeys () == 0 ) {
1702+ return output .apply (
1703+ "Insert Redistribute" ,
1704+ Redistribute .<KafkaRecord <K , V >>arbitrarily ()
1705+ .withAllowDuplicates (kafkaRead .isAllowDuplicates ()));
1706+ } else {
1707+ return output .apply (
1708+ "Insert Redistribute with Shards" ,
1709+ Redistribute .<KafkaRecord <K , V >>arbitrarily ()
1710+ .withAllowDuplicates (kafkaRead .isAllowDuplicates ())
1711+ .withNumBuckets ((int ) kafkaRead .getRedistributeNumKeys ()));
1712+ }
1713+ }
16211714 return input .getPipeline ().apply (transform );
16221715 }
16231716 }
@@ -1637,6 +1730,8 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16371730 .withKeyDeserializerProvider (kafkaRead .getKeyDeserializerProvider ())
16381731 .withValueDeserializerProvider (kafkaRead .getValueDeserializerProvider ())
16391732 .withManualWatermarkEstimator ()
1733+ .withRedistribute ()
1734+ .withAllowDuplicates () // must be set with withRedistribute option.
16401735 .withTimestampPolicyFactory (kafkaRead .getTimestampPolicyFactory ())
16411736 .withCheckStopReadingFn (kafkaRead .getCheckStopReadingFn ())
16421737 .withConsumerPollingTimeout (kafkaRead .getConsumerPollingTimeout ());
@@ -1650,6 +1745,15 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16501745 readTransform =
16511746 readTransform .withBadRecordErrorHandler (kafkaRead .getBadRecordErrorHandler ());
16521747 }
1748+ if (kafkaRead .isRedistributed ()) {
1749+ readTransform = readTransform .withRedistribute ();
1750+ }
1751+ if (kafkaRead .isAllowDuplicates ()) {
1752+ readTransform = readTransform .withAllowDuplicates ();
1753+ }
1754+ if (kafkaRead .getRedistributeNumKeys () > 0 ) {
1755+ readTransform = readTransform .withRedistributeNumKeys (kafkaRead .getRedistributeNumKeys ());
1756+ }
16531757 PCollection <KafkaSourceDescriptor > output ;
16541758 if (kafkaRead .isDynamicRead ()) {
16551759 Set <String > topics = new HashSet <>();
@@ -1679,6 +1783,22 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16791783 .apply (Impulse .create ())
16801784 .apply (ParDo .of (new GenerateKafkaSourceDescriptor (kafkaRead )));
16811785 }
1786+ if (kafkaRead .isRedistributed ()) {
1787+ PCollection <KafkaRecord <K , V >> pcol =
1788+ output .apply (readTransform ).setCoder (KafkaRecordCoder .of (keyCoder , valueCoder ));
1789+ if (kafkaRead .getRedistributeNumKeys () == 0 ) {
1790+ return pcol .apply (
1791+ "Insert Redistribute" ,
1792+ Redistribute .<KafkaRecord <K , V >>arbitrarily ()
1793+ .withAllowDuplicates (kafkaRead .isAllowDuplicates ()));
1794+ } else {
1795+ return pcol .apply (
1796+ "Insert Redistribute with Shards" ,
1797+ Redistribute .<KafkaRecord <K , V >>arbitrarily ()
1798+ .withAllowDuplicates (true )
1799+ .withNumBuckets ((int ) kafkaRead .getRedistributeNumKeys ()));
1800+ }
1801+ }
16821802 return output .apply (readTransform ).setCoder (KafkaRecordCoder .of (keyCoder , valueCoder ));
16831803 }
16841804 }
@@ -2070,6 +2190,15 @@ public abstract static class ReadSourceDescriptors<K, V>
20702190 @ Pure
20712191 abstract boolean isCommitOffsetEnabled ();
20722192
2193+ @ Pure
2194+ abstract boolean isRedistribute ();
2195+
2196+ @ Pure
2197+ abstract boolean isAllowDuplicates ();
2198+
2199+ @ Pure
2200+ abstract int getRedistributeNumKeys ();
2201+
20732202 @ Pure
20742203 abstract @ Nullable TimestampPolicyFactory <K , V > getTimestampPolicyFactory ();
20752204
@@ -2136,6 +2265,12 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
21362265
21372266 abstract ReadSourceDescriptors .Builder <K , V > setBounded (boolean bounded );
21382267
2268+ abstract ReadSourceDescriptors .Builder <K , V > setRedistribute (boolean withRedistribute );
2269+
2270+ abstract ReadSourceDescriptors .Builder <K , V > setAllowDuplicates (boolean allowDuplicates );
2271+
2272+ abstract ReadSourceDescriptors .Builder <K , V > setRedistributeNumKeys (int redistributeNumKeys );
2273+
21392274 abstract ReadSourceDescriptors <K , V > build ();
21402275 }
21412276
@@ -2148,6 +2283,9 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
21482283 .setBadRecordRouter (BadRecordRouter .THROWING_ROUTER )
21492284 .setBadRecordErrorHandler (new ErrorHandler .DefaultErrorHandler <>())
21502285 .setConsumerPollingTimeout (2L )
2286+ .setRedistribute (false )
2287+ .setAllowDuplicates (false )
2288+ .setRedistributeNumKeys (0 )
21512289 .build ()
21522290 .withProcessingTime ()
21532291 .withMonotonicallyIncreasingWatermarkEstimator ();
@@ -2307,6 +2445,19 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {
23072445 ReadSourceDescriptors .ExtractOutputTimestampFns .useProcessingTime ());
23082446 }
23092447
2448+ /** Enable Redistribute. */
2449+ public ReadSourceDescriptors <K , V > withRedistribute () {
2450+ return toBuilder ().setRedistribute (true ).build ();
2451+ }
2452+
2453+ public ReadSourceDescriptors <K , V > withAllowDuplicates () {
2454+ return toBuilder ().setAllowDuplicates (true ).build ();
2455+ }
2456+
2457+ public ReadSourceDescriptors <K , V > withRedistributeNumKeys (int redistributeNumKeys ) {
2458+ return toBuilder ().setRedistributeNumKeys (redistributeNumKeys ).build ();
2459+ }
2460+
23102461 /** Use the creation time of {@link KafkaRecord} as the output timestamp. */
23112462 public ReadSourceDescriptors <K , V > withCreateTime () {
23122463 return withExtractOutputTimestampFn (
@@ -2497,6 +2648,12 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
24972648 }
24982649 }
24992650
2651+ if (isRedistribute ()) {
2652+ if (getRedistributeNumKeys () == 0 ) {
2653+ LOG .warn ("This will create a key per record, which is sub-optimal for most use cases." );
2654+ }
2655+ }
2656+
25002657 if (getConsumerConfig ().get (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ) == null ) {
25012658 LOG .warn (
25022659 "The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail." );
@@ -2527,7 +2684,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
25272684 .getSchemaRegistry ()
25282685 .getSchemaCoder (KafkaSourceDescriptor .class ),
25292686 recordCoder ));
2530- if (isCommitOffsetEnabled () && !configuredKafkaCommit ()) {
2687+ if (isCommitOffsetEnabled () && !configuredKafkaCommit () && ! isRedistribute () ) {
25312688 outputWithDescriptor =
25322689 outputWithDescriptor
25332690 .apply (Reshuffle .viaRandomKey ())
@@ -2538,6 +2695,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
25382695 .getSchemaRegistry ()
25392696 .getSchemaCoder (KafkaSourceDescriptor .class ),
25402697 recordCoder ));
2698+
25412699 PCollection <Void > unused = outputWithDescriptor .apply (new KafkaCommitOffset <K , V >(this ));
25422700 unused .setCoder (VoidCoder .of ());
25432701 }
0 commit comments