Skip to content

Commit e1d765b

Browse files
committed
Fix flaky unit test, in which incorrect assumptions of the test framework(that input elements are processed in the same order as in Create.of) is made.
1 parent ce8291f commit e1d765b

1 file changed

Lines changed: 60 additions & 15 deletions

File tree

  • sdks/java/core/src/test/java/org/apache/beam/sdk/transforms

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2908,21 +2908,6 @@ public void processElement(
29082908
@StateId(stateId) MultimapState<String, Integer> state,
29092909
@StateId(countStateId) CombiningState<Integer, int[], Integer> count,
29102910
OutputReceiver<KV<String, Integer>> r) {
2911-
if (count.read() == 1) {
2912-
ReadableState<Boolean> isEmptyView = state.isEmpty();
2913-
boolean isEmpty = state.isEmpty().read();
2914-
2915-
state.remove("a");
2916-
2917-
// isEmpty before and after remove:
2918-
assertFalse(isEmpty);
2919-
assertTrue(isEmptyView.read());
2920-
assertTrue(state.isEmpty().read());
2921-
2922-
// put the removed key-value pair back
2923-
state.put("a", 97);
2924-
}
2925-
29262911
KV<String, Integer> value = element.getValue();
29272912
state.put(value.getKey(), value.getValue());
29282913
count.add(1);
@@ -2979,6 +2964,66 @@ public void processElement(
29792964
pipeline.run();
29802965
}
29812966

2967+
@Test
2968+
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMultimapState.class})
2969+
public void testMultimapStateIsEmptyAfterRemove() {
2970+
final String stateId = "foo:";
2971+
final String countStateId = "count";
2972+
DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>> fn =
2973+
new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
2974+
2975+
@StateId(stateId)
2976+
private final StateSpec<MultimapState<String, Integer>> multimapState =
2977+
StateSpecs.multimap(StringUtf8Coder.of(), VarIntCoder.of());
2978+
2979+
@StateId(countStateId)
2980+
private final StateSpec<CombiningState<Integer, int[], Integer>> countState =
2981+
StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers());
2982+
2983+
@ProcessElement
2984+
public void processElement(
2985+
ProcessContext c,
2986+
@Element KV<String, KV<String, Integer>> element,
2987+
@StateId(stateId) MultimapState<String, Integer> state,
2988+
@StateId(countStateId) CombiningState<Integer, int[], Integer> count,
2989+
OutputReceiver<KV<String, Integer>> r) {
2990+
KV<String, Integer> value = element.getValue();
2991+
state.put(value.getKey(), value.getValue());
2992+
count.add(1);
2993+
2994+
if (count.read() >= 4) {
2995+
ReadableState<Boolean> isEmptyView = state.isEmpty();
2996+
boolean isEmpty = state.isEmpty().read();
2997+
2998+
state.remove("a");
2999+
3000+
assertFalse(isEmpty);
3001+
assertFalse(isEmptyView.read());
3002+
assertFalse(state.isEmpty().read());
3003+
3004+
isEmptyView = state.isEmpty();
3005+
isEmpty = state.isEmpty().read();
3006+
3007+
// isEmpty after removing the only multimap key.
3008+
state.remove("b");
3009+
3010+
assertFalse(isEmpty);
3011+
assertTrue(isEmptyView.read());
3012+
assertTrue(state.isEmpty().read());
3013+
}
3014+
}
3015+
};
3016+
PCollection<KV<String, Integer>> output =
3017+
pipeline
3018+
.apply(
3019+
Create.of(
3020+
KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("a", 97)),
3021+
KV.of("hello", KV.of("a", 98)), KV.of("hello", KV.of("b", 33))))
3022+
.apply(ParDo.of(fn));
3023+
PAssert.that(output).empty();
3024+
pipeline.run();
3025+
}
3026+
29823027
@Test
29833028
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMultimapState.class})
29843029
public void testMultimapStateClear() {

0 commit comments

Comments
 (0)