Skip to content

Commit 66db2d8

Browse files
Merge pull request #24522: [Spark Dataset runner] Add @experimental and reduce visibility where possible
2 parents a9dcf95 + c197579 commit 66db2d8

8 files changed

Lines changed: 16 additions & 11 deletions

File tree

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.runners.spark.structuredstreaming;
1919

2020
import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
21+
import org.apache.beam.sdk.annotations.Experimental;
2122
import org.apache.beam.sdk.options.Default;
2223
import org.apache.beam.sdk.options.Description;
2324
import org.apache.beam.sdk.options.PipelineOptions;
@@ -26,6 +27,7 @@
2627
* Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
2728
* master address, and other user-related knobs.
2829
*/
30+
@Experimental
2931
public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {
3032

3133
/** Set to true to run the job in test mode. */

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
4040
import org.apache.beam.sdk.Pipeline;
4141
import org.apache.beam.sdk.PipelineRunner;
42+
import org.apache.beam.sdk.annotations.Experimental;
4243
import org.apache.beam.sdk.metrics.MetricsEnvironment;
4344
import org.apache.beam.sdk.metrics.MetricsOptions;
4445
import org.apache.beam.sdk.options.ExperimentalOptions;
@@ -83,6 +84,7 @@
8384
* PipelineResult result = p.run();
8485
* }</pre>
8586
*/
87+
@Experimental
8688
@SuppressWarnings({
8789
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
8890
})

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.function.Function;
3838
import javax.annotation.Nullable;
3939
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
40+
import org.apache.beam.sdk.annotations.Internal;
4041
import org.apache.beam.sdk.transforms.Combine.CombineFn;
4142
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4243
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -58,7 +59,8 @@
5859
import org.checkerframework.checker.nullness.qual.PolyNull;
5960
import org.joda.time.Instant;
6061

61-
public class Aggregators {
62+
@Internal
63+
class Aggregators {
6264

6365
/**
6466
* Creates simple value {@link Aggregator} that is not window aware.
@@ -68,7 +70,7 @@ public class Aggregators {
6870
* @param <ResT> {@link CombineFn} / {@link Aggregator} result type
6971
* @param <InT> {@link Aggregator} input type
7072
*/
71-
public static <ValT, AccT, ResT, InT> Aggregator<InT, ?, ResT> value(
73+
static <ValT, AccT, ResT, InT> Aggregator<InT, ?, ResT> value(
7274
CombineFn<ValT, AccT, ResT> fn,
7375
Fun1<InT, ValT> valueFn,
7476
Encoder<AccT> accEnc,
@@ -89,7 +91,7 @@ public class Aggregators {
8991
* @param <ResT> {@link CombineFn} / {@link Aggregator} result type
9092
* @param <InT> {@link Aggregator} input type
9193
*/
92-
public static <ValT, AccT, ResT, InT>
94+
static <ValT, AccT, ResT, InT>
9395
Aggregator<WindowedValue<InT>, ?, Collection<WindowedValue<ResT>>> windowedValue(
9496
CombineFn<ValT, AccT, ResT> fn,
9597
Fun1<WindowedValue<InT>, ValT> valueFn,

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class CombineGloballyTranslatorBatch<InT, AccT, OutT>
5252
extends TransformTranslator<PCollection<InT>, PCollection<OutT>, Combine.Globally<InT, OutT>> {
5353

5454
@Override
55-
public void translate(Combine.Globally<InT, OutT> transform, Context cxt) {
55+
protected void translate(Combine.Globally<InT, OutT> transform, Context cxt) {
5656
WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
5757
CombineFn<InT, AccT, OutT> combineFn = (CombineFn<InT, AccT, OutT>) transform.getFn();
5858

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTranslatorBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* <p>This doesn't require a Spark {@link Aggregator}. Instead it can directly use the respective
3838
* {@link CombineFn} to reduce each iterable of values into an aggregated output value.
3939
*/
40-
public class CombineGroupedValuesTranslatorBatch<K, InT, AccT, OutT>
40+
class CombineGroupedValuesTranslatorBatch<K, InT, AccT, OutT>
4141
extends TransformTranslator<
4242
PCollection<? extends KV<K, ? extends Iterable<InT>>>,
4343
PCollection<KV<K, OutT>>,

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ class GroupByKeyTranslatorBatch<K, V>
110110

111111
private boolean useCollectList = true;
112112

113-
public GroupByKeyTranslatorBatch() {}
113+
GroupByKeyTranslatorBatch() {}
114114

115-
public GroupByKeyTranslatorBatch(boolean useCollectList) {
115+
GroupByKeyTranslatorBatch(boolean useCollectList) {
116116
this.useCollectList = useCollectList;
117117
}
118118

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ImpulseTranslatorBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@
2929
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
3030
import org.apache.spark.sql.Dataset;
3131

32-
public class ImpulseTranslatorBatch
33-
extends TransformTranslator<PBegin, PCollection<byte[]>, Impulse> {
32+
class ImpulseTranslatorBatch extends TransformTranslator<PBegin, PCollection<byte[]>, Impulse> {
3433

3534
@Override
3635
public void translate(Impulse transform, Context cxt) {

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.spark.sql.Dataset;
2929
import org.apache.spark.sql.internal.SQLConf;
3030

31-
public class ReshuffleTranslatorBatch<K, V>
31+
class ReshuffleTranslatorBatch<K, V>
3232
extends TransformTranslator<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Reshuffle<K, V>> {
3333

3434
@Override
@@ -37,7 +37,7 @@ protected void translate(Reshuffle<K, V> transform, Context cxt) throws IOExcept
3737
cxt.putDataset(cxt.getOutput(), input.repartition(col("value.key")));
3838
}
3939

40-
public static class ViaRandomKey<V>
40+
static class ViaRandomKey<V>
4141
extends TransformTranslator<PCollection<V>, PCollection<V>, Reshuffle.ViaRandomKey<V>> {
4242

4343
@Override

0 commit comments

Comments
 (0)