Skip to content

Commit 435eab3

Browse files
lostluckAbacn
andauthored
Add Default method for OutputReceiver.outputWindowedValue (#30240)
* Revert changes of override method that simply throw UnsupportedOperationException in subclasses Co-authored-by: Yi Hu <yathu@google.com>
1 parent 782a78b commit 435eab3

8 files changed

Lines changed: 5 additions & 93 deletions

File tree

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.auto.service.AutoService;
2323
import java.io.IOException;
24-
import java.util.Collection;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.concurrent.ThreadLocalRandom;
@@ -670,15 +669,6 @@ public void output(RestrictionT part) {
670669
public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
671670
throw new UnsupportedOperationException();
672671
}
673-
674-
@Override
675-
public void outputWindowedValue(
676-
RestrictionT output,
677-
Instant timestamp,
678-
Collection<? extends BoundedWindow> windows,
679-
PaneInfo paneInfo) {
680-
throw new UnsupportedOperationException();
681-
}
682672
};
683673
}
684674

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,14 @@ public interface OutputReceiver<T> {
395395

396396
void outputWithTimestamp(T output, Instant timestamp);
397397

398-
void outputWindowedValue(
398+
default void outputWindowedValue(
399399
T output,
400400
Instant timestamp,
401401
Collection<? extends BoundedWindow> windows,
402-
PaneInfo paneInfo);
402+
PaneInfo paneInfo) {
403+
throw new UnsupportedOperationException(
404+
String.format("Not implemented: %s.outputWindowedValue", this.getClass().getName()));
405+
}
403406
}
404407

405408
/** Receives tagged output for a multi-output function. */

sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.zetasql.Value;
2727
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2828
import java.util.ArrayDeque;
29-
import java.util.Collection;
3029
import java.util.HashMap;
3130
import java.util.List;
3231
import java.util.Map;
@@ -49,7 +48,6 @@
4948
import org.apache.beam.sdk.transforms.PTransform;
5049
import org.apache.beam.sdk.transforms.ParDo;
5150
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
52-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
5351
import org.apache.beam.sdk.values.PCollection;
5452
import org.apache.beam.sdk.values.PCollectionList;
5553
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -363,16 +361,6 @@ public void output(Row output) {
363361
public void outputWithTimestamp(Row output, Instant timestamp) {
364362
c.output(tag, output, timestamp, w);
365363
}
366-
367-
@Override
368-
public void outputWindowedValue(
369-
Row output,
370-
Instant timestamp,
371-
Collection<? extends BoundedWindow> windows,
372-
PaneInfo paneInfo) {
373-
throw new UnsupportedOperationException(
374-
"outputWindowedValue not supported in finish bundle here");
375-
}
376364
}
377365

378366
private static RuntimeException extractException(Throwable e) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.io.IOException;
3838
import java.time.Instant;
3939
import java.util.Arrays;
40-
import java.util.Collection;
4140
import java.util.List;
4241
import java.util.Map;
4342
import java.util.Objects;
@@ -70,7 +69,6 @@
7069
import org.apache.beam.sdk.transforms.Reshuffle;
7170
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
7271
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
73-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
7472
import org.apache.beam.sdk.util.Preconditions;
7573
import org.apache.beam.sdk.values.KV;
7674
import org.apache.beam.sdk.values.PCollection;
@@ -1095,15 +1093,6 @@ public void outputWithTimestamp(
10951093
BigQueryStorageApiInsertError output, org.joda.time.Instant timestamp) {
10961094
context.output(failedRowsTag, output, timestamp, GlobalWindow.INSTANCE);
10971095
}
1098-
1099-
@Override
1100-
public void outputWindowedValue(
1101-
BigQueryStorageApiInsertError output,
1102-
org.joda.time.Instant timestamp,
1103-
Collection<? extends BoundedWindow> windows,
1104-
PaneInfo paneInfo) {
1105-
throw new UnsupportedOperationException("outputWindowedValue not supported");
1106-
}
11071096
};
11081097
@Nullable OutputReceiver<TableRow> successfulRowsReceiver = null;
11091098
if (successfulRowsTag != null) {
@@ -1118,15 +1107,6 @@ public void output(TableRow output) {
11181107
public void outputWithTimestamp(TableRow output, org.joda.time.Instant timestamp) {
11191108
context.output(successfulRowsTag, output, timestamp, GlobalWindow.INSTANCE);
11201109
}
1121-
1122-
@Override
1123-
public void outputWindowedValue(
1124-
TableRow output,
1125-
org.joda.time.Instant timestamp,
1126-
Collection<? extends BoundedWindow> windows,
1127-
PaneInfo paneInfo) {
1128-
throw new UnsupportedOperationException("outputWindowedValue not supported");
1129-
}
11301110
};
11311111
}
11321112

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import java.io.IOException;
5757
import java.util.ArrayList;
5858
import java.util.Arrays;
59-
import java.util.Collection;
6059
import java.util.Comparator;
6160
import java.util.HashMap;
6261
import java.util.List;
@@ -103,11 +102,9 @@
103102
import org.apache.beam.sdk.transforms.Wait;
104103
import org.apache.beam.sdk.transforms.WithTimestamps;
105104
import org.apache.beam.sdk.transforms.display.DisplayData;
106-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
107105
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
108106
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
109107
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
110-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
111108
import org.apache.beam.sdk.transforms.windowing.Window;
112109
import org.apache.beam.sdk.util.BackOff;
113110
import org.apache.beam.sdk.util.FluentBackoff;
@@ -2005,15 +2002,6 @@ public void output(Iterable<MutationGroup> output) {
20052002
public void outputWithTimestamp(Iterable<MutationGroup> output, Instant timestamp) {
20062003
c.output(output, timestamp, GlobalWindow.INSTANCE);
20072004
}
2008-
2009-
@Override
2010-
public void outputWindowedValue(
2011-
Iterable<MutationGroup> output,
2012-
Instant timestamp,
2013-
Collection<? extends BoundedWindow> windows,
2014-
PaneInfo paneInfo) {
2015-
throw new UnsupportedOperationException("outputWindowedValue not supported");
2016-
}
20172005
}
20182006
}
20192007

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
5151
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler;
5252
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
53-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
54-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
5553
import org.apache.beam.sdk.values.KV;
5654
import org.apache.beam.sdk.values.PCollection;
5755
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -342,15 +340,6 @@ public void outputWithTimestamp(
342340
records.add(output);
343341
}
344342

345-
@Override
346-
public void outputWindowedValue(
347-
T output,
348-
Instant timestamp,
349-
Collection<? extends BoundedWindow> windows,
350-
PaneInfo paneInfo) {
351-
throw new UnsupportedOperationException("Not expecting outputWindowedValue");
352-
}
353-
354343
public List<T> getOutputs() {
355344
return this.records;
356345
}

sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,11 @@
2121
import static org.junit.Assert.assertTrue;
2222

2323
import java.util.ArrayList;
24-
import java.util.Collection;
2524
import java.util.List;
2625
import org.apache.beam.sdk.io.range.OffsetRange;
2726
import org.apache.beam.sdk.transforms.DoFn;
2827
import org.apache.beam.sdk.transforms.SerializableFunction;
2928
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
30-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
31-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3229
import org.apache.pulsar.client.api.MessageId;
3330
import org.apache.pulsar.client.api.PulsarClient;
3431
import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -176,16 +173,6 @@ public void outputWithTimestamp(
176173
records.add(output);
177174
}
178175

179-
@Override
180-
public void outputWindowedValue(
181-
PulsarMessage output,
182-
Instant timestamp,
183-
Collection<? extends BoundedWindow> windows,
184-
PaneInfo paneInfo) {
185-
throw new UnsupportedOperationException(
186-
"unsupported outputWindowedValue in mock outputreceiver");
187-
}
188-
189176
public List<PulsarMessage> getOutputs() {
190177
return records;
191178
}

sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@
2222
import static org.junit.Assert.assertTrue;
2323

2424
import java.util.ArrayList;
25-
import java.util.Collection;
2625
import java.util.List;
2726
import org.apache.beam.sdk.io.range.OffsetRange;
2827
import org.apache.beam.sdk.transforms.DoFn;
2928
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
3029
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
3130
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
32-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
33-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3431
import org.checkerframework.checker.initialization.qual.Initialized;
3532
import org.checkerframework.checker.nullness.qual.NonNull;
3633
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
@@ -67,16 +64,6 @@ public void outputWithTimestamp(
6764
records.add(output);
6865
}
6966

70-
@Override
71-
public void outputWindowedValue(
72-
String output,
73-
Instant timestamp,
74-
Collection<? extends BoundedWindow> windows,
75-
PaneInfo paneInfo) {
76-
throw new UnsupportedOperationException(
77-
"Not expecting to receive call to outputWindowedValue");
78-
}
79-
8067
public List<String> getOutputs() {
8168
return this.records;
8269
}

0 commit comments

Comments
 (0)