Skip to content

Commit abf1904

Browse files
authored
Merge pull request #36523: revert outputWindowedValue changes from KafkaIO as there is outputBuilder
2 parents d687f4f + f0c92c7 commit abf1904

8 files changed

Lines changed: 5 additions & 509 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -424,24 +424,6 @@ public void outputWindowedValue(
424424
outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo));
425425
}
426426

427-
@Override
428-
public void outputWindowedValue(
429-
OutputT value,
430-
Instant timestamp,
431-
Collection<? extends BoundedWindow> windows,
432-
PaneInfo paneInfo,
433-
@Nullable String currentRecordId,
434-
@Nullable Long currentRecordOffset) {
435-
noteOutput();
436-
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
437-
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
438-
}
439-
outputReceiver.output(
440-
mainOutputTag,
441-
WindowedValues.of(
442-
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
443-
}
444-
445427
@Override
446428
public <T> void output(TupleTag<T> tag, T value) {
447429
outputWithTimestamp(tag, value, element.getTimestamp());
@@ -460,26 +442,11 @@ public <T> void outputWindowedValue(
460442
Instant timestamp,
461443
Collection<? extends BoundedWindow> windows,
462444
PaneInfo paneInfo) {
463-
outputWindowedValue(tag, value, timestamp, windows, paneInfo, null, null);
464-
}
465-
466-
@Override
467-
public <T> void outputWindowedValue(
468-
TupleTag<T> tag,
469-
T value,
470-
Instant timestamp,
471-
Collection<? extends BoundedWindow> windows,
472-
PaneInfo paneInfo,
473-
@Nullable String currentRecordId,
474-
@Nullable Long currentRecordOffset) {
475445
noteOutput();
476446
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
477447
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
478448
}
479-
outputReceiver.output(
480-
tag,
481-
WindowedValues.of(
482-
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
449+
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
483450
}
484451

485452
private void noteOutput() {

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 0 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -336,35 +336,6 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
336336
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
337337
outputWindowedValue(tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING));
338338
}
339-
340-
@Override
341-
public void output(
342-
OutputT output,
343-
Instant timestamp,
344-
BoundedWindow window,
345-
@Nullable String currentRecordId,
346-
@Nullable Long currentRecordOffset) {
347-
output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset);
348-
}
349-
350-
@Override
351-
public <T> void output(
352-
TupleTag<T> tag,
353-
T output,
354-
Instant timestamp,
355-
BoundedWindow window,
356-
@Nullable String currentRecordId,
357-
@Nullable Long currentRecordOffset) {
358-
outputWindowedValue(
359-
tag,
360-
WindowedValues.of(
361-
output,
362-
timestamp,
363-
Collections.singletonList(window),
364-
PaneInfo.NO_FIRING,
365-
currentRecordId,
366-
currentRecordOffset));
367-
}
368339
}
369340

370341
private final DoFnFinishBundleArgumentProvider.Context context =
@@ -461,24 +432,6 @@ public void outputWindowedValue(
461432
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
462433
}
463434

464-
@Override
465-
public void outputWindowedValue(
466-
OutputT output,
467-
Instant timestamp,
468-
Collection<? extends BoundedWindow> windows,
469-
PaneInfo paneInfo,
470-
@Nullable String currentRecordId,
471-
@Nullable Long currentRecordOffset) {
472-
outputWindowedValue(
473-
mainOutputTag,
474-
output,
475-
timestamp,
476-
windows,
477-
paneInfo,
478-
currentRecordId,
479-
currentRecordOffset);
480-
}
481-
482435
@Override
483436
public <T> void output(TupleTag<T> tag, T output) {
484437
checkNotNull(tag, "Tag passed to output cannot be null");
@@ -512,21 +465,6 @@ public <T> void outputWindowedValue(
512465
.output();
513466
}
514467

515-
@Override
516-
public <T> void outputWindowedValue(
517-
TupleTag<T> tag,
518-
T output,
519-
Instant timestamp,
520-
Collection<? extends BoundedWindow> windows,
521-
PaneInfo paneInfo,
522-
@Nullable String currentRecordId,
523-
@Nullable Long currentRecordOffset) {
524-
SimpleDoFnRunner.this.outputWindowedValue(
525-
tag,
526-
WindowedValues.of(
527-
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
528-
}
529-
530468
@Override
531469
public Instant timestamp() {
532470
return elem.getTimestamp();
@@ -964,24 +902,6 @@ public void outputWindowedValue(
964902
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
965903
}
966904

967-
@Override
968-
public void outputWindowedValue(
969-
OutputT output,
970-
Instant timestamp,
971-
Collection<? extends BoundedWindow> windows,
972-
PaneInfo paneInfo,
973-
@Nullable String currentRecordId,
974-
@Nullable Long currentRecordOffset) {
975-
outputWindowedValue(
976-
mainOutputTag,
977-
output,
978-
timestamp,
979-
windows,
980-
paneInfo,
981-
currentRecordId,
982-
currentRecordOffset);
983-
}
984-
985905
@Override
986906
public <T> void output(TupleTag<T> tag, T output) {
987907
checkTimestamp(timestamp(), timestamp);
@@ -1013,22 +933,6 @@ public <T> void outputWindowedValue(
1013933
.output();
1014934
}
1015935

1016-
@Override
1017-
public <T> void outputWindowedValue(
1018-
TupleTag<T> tag,
1019-
T output,
1020-
Instant timestamp,
1021-
Collection<? extends BoundedWindow> windows,
1022-
PaneInfo paneInfo,
1023-
@Nullable String currentRecordId,
1024-
@Nullable Long currentRecordOffset) {
1025-
checkTimestamp(timestamp(), timestamp);
1026-
SimpleDoFnRunner.this.outputWindowedValue(
1027-
tag,
1028-
WindowedValues.of(
1029-
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
1030-
}
1031-
1032936
@Override
1033937
public BundleFinalizer bundleFinalizer() {
1034938
throw new UnsupportedOperationException(
@@ -1243,24 +1147,6 @@ public void outputWindowedValue(
12431147
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
12441148
}
12451149

1246-
@Override
1247-
public void outputWindowedValue(
1248-
OutputT output,
1249-
Instant timestamp,
1250-
Collection<? extends BoundedWindow> windows,
1251-
PaneInfo paneInfo,
1252-
@Nullable String currentRecordId,
1253-
@Nullable Long currentRecordOffset) {
1254-
outputWindowedValue(
1255-
mainOutputTag,
1256-
output,
1257-
timestamp,
1258-
windows,
1259-
paneInfo,
1260-
currentRecordId,
1261-
currentRecordOffset);
1262-
}
1263-
12641150
@Override
12651151
public <T> void output(TupleTag<T> tag, T output) {
12661152
checkTimestamp(this.timestamp, timestamp);
@@ -1291,22 +1177,6 @@ public <T> void outputWindowedValue(
12911177
.output();
12921178
}
12931179

1294-
@Override
1295-
public <T> void outputWindowedValue(
1296-
TupleTag<T> tag,
1297-
T output,
1298-
Instant timestamp,
1299-
Collection<? extends BoundedWindow> windows,
1300-
PaneInfo paneInfo,
1301-
@Nullable String currentRecordId,
1302-
@Nullable Long currentRecordOffset) {
1303-
checkTimestamp(this.timestamp, timestamp);
1304-
SimpleDoFnRunner.this.outputWindowedValue(
1305-
tag,
1306-
WindowedValues.of(
1307-
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
1308-
}
1309-
13101180
@Override
13111181
public BundleFinalizer bundleFinalizer() {
13121182
throw new UnsupportedOperationException(

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -662,27 +662,6 @@ public <T> void output(
662662
throwUnsupportedOutput();
663663
}
664664

665-
@Override
666-
public void output(
667-
OutputT output,
668-
Instant timestamp,
669-
BoundedWindow window,
670-
@Nullable String currentRecordId,
671-
@Nullable Long currentRecordOffset) {
672-
throwUnsupportedOutput();
673-
}
674-
675-
@Override
676-
public <T> void output(
677-
TupleTag<T> tag,
678-
T output,
679-
Instant timestamp,
680-
BoundedWindow window,
681-
@Nullable String currentRecordId,
682-
@Nullable Long currentRecordOffset) {
683-
throwUnsupportedOutput();
684-
}
685-
686665
@Override
687666
public PipelineOptions getPipelineOptions() {
688667
return baseContext.getPipelineOptions();

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,6 @@ public abstract class FinishBundleContext {
123123
*/
124124
public abstract void output(OutputT output, Instant timestamp, BoundedWindow window);
125125

126-
public abstract void output(
127-
OutputT output,
128-
Instant timestamp,
129-
BoundedWindow window,
130-
@Nullable String currentRecordId,
131-
@Nullable Long currentRecordOffset);
132126
/**
133127
* Adds the given element to the output {@code PCollection} with the given tag at the given
134128
* timestamp in the given window.
@@ -140,14 +134,6 @@ public abstract void output(
140134
*/
141135
public abstract <T> void output(
142136
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window);
143-
144-
public abstract <T> void output(
145-
TupleTag<T> tag,
146-
T output,
147-
Instant timestamp,
148-
BoundedWindow window,
149-
@Nullable String currentRecordId,
150-
@Nullable Long currentRecordOffset);
151137
}
152138

153139
/**
@@ -226,14 +212,6 @@ public abstract void outputWindowedValue(
226212
Collection<? extends BoundedWindow> windows,
227213
PaneInfo paneInfo);
228214

229-
public abstract void outputWindowedValue(
230-
OutputT output,
231-
Instant timestamp,
232-
Collection<? extends BoundedWindow> windows,
233-
PaneInfo paneInfo,
234-
@Nullable String currentRecordId,
235-
@Nullable Long currentRecordOffset);
236-
237215
/**
238216
* Adds the given element to the output {@code PCollection} with the given tag.
239217
*
@@ -306,15 +284,6 @@ public abstract <T> void outputWindowedValue(
306284
Instant timestamp,
307285
Collection<? extends BoundedWindow> windows,
308286
PaneInfo paneInfo);
309-
310-
public abstract <T> void outputWindowedValue(
311-
TupleTag<T> tag,
312-
T output,
313-
Instant timestamp,
314-
Collection<? extends BoundedWindow> windows,
315-
PaneInfo paneInfo,
316-
@Nullable String currentRecordId,
317-
@Nullable Long currentRecordOffset);
318287
}
319288

320289
/** Information accessible when running a {@link DoFn.ProcessElement} method. */

0 commit comments

Comments
 (0)