@@ -2908,21 +2908,6 @@ public void processElement(
29082908 @ StateId (stateId ) MultimapState <String , Integer > state ,
29092909 @ StateId (countStateId ) CombiningState <Integer , int [], Integer > count ,
29102910 OutputReceiver <KV <String , Integer >> r ) {
2911- if (count .read () == 1 ) {
2912- ReadableState <Boolean > isEmptyView = state .isEmpty ();
2913- boolean isEmpty = state .isEmpty ().read ();
2914-
2915- state .remove ("a" );
2916-
2917- // isEmpty before and after remove:
2918- assertFalse (isEmpty );
2919- assertTrue (isEmptyView .read ());
2920- assertTrue (state .isEmpty ().read ());
2921-
2922- // put the removed key-value pair back
2923- state .put ("a" , 97 );
2924- }
2925-
29262911 KV <String , Integer > value = element .getValue ();
29272912 state .put (value .getKey (), value .getValue ());
29282913 count .add (1 );
@@ -2979,6 +2964,66 @@ public void processElement(
29792964 pipeline .run ();
29802965 }
29812966
2967+ @ Test
2968+ @ Category ({ValidatesRunner .class , UsesStatefulParDo .class , UsesMultimapState .class })
2969+ public void testMultimapStateIsEmptyAfterRemove () {
2970+ final String stateId = "foo:" ;
2971+ final String countStateId = "count" ;
2972+ DoFn <KV <String , KV <String , Integer >>, KV <String , Integer >> fn =
2973+ new DoFn <KV <String , KV <String , Integer >>, KV <String , Integer >>() {
2974+
2975+ @ StateId (stateId )
2976+ private final StateSpec <MultimapState <String , Integer >> multimapState =
2977+ StateSpecs .multimap (StringUtf8Coder .of (), VarIntCoder .of ());
2978+
2979+ @ StateId (countStateId )
2980+ private final StateSpec <CombiningState <Integer , int [], Integer >> countState =
2981+ StateSpecs .combiningFromInputInternal (VarIntCoder .of (), Sum .ofIntegers ());
2982+
2983+ @ ProcessElement
2984+ public void processElement (
2985+ ProcessContext c ,
2986+ @ Element KV <String , KV <String , Integer >> element ,
2987+ @ StateId (stateId ) MultimapState <String , Integer > state ,
2988+ @ StateId (countStateId ) CombiningState <Integer , int [], Integer > count ,
2989+ OutputReceiver <KV <String , Integer >> r ) {
2990+ KV <String , Integer > value = element .getValue ();
2991+ state .put (value .getKey (), value .getValue ());
2992+ count .add (1 );
2993+
2994+ if (count .read () >= 4 ) {
2995+ ReadableState <Boolean > isEmptyView = state .isEmpty ();
2996+ boolean isEmpty = state .isEmpty ().read ();
2997+
2998+ state .remove ("a" );
2999+
3000+ assertFalse (isEmpty );
3001+ assertFalse (isEmptyView .read ());
3002+ assertFalse (state .isEmpty ().read ());
3003+
3004+ isEmptyView = state .isEmpty ();
3005+ isEmpty = state .isEmpty ().read ();
3006+
3007+ // isEmpty after removing the only multimap key.
3008+ state .remove ("b" );
3009+
3010+ assertFalse (isEmpty );
3011+ assertTrue (isEmptyView .read ());
3012+ assertTrue (state .isEmpty ().read ());
3013+ }
3014+ }
3015+ };
3016+ PCollection <KV <String , Integer >> output =
3017+ pipeline
3018+ .apply (
3019+ Create .of (
3020+ KV .of ("hello" , KV .of ("a" , 97 )), KV .of ("hello" , KV .of ("a" , 97 )),
3021+ KV .of ("hello" , KV .of ("a" , 98 )), KV .of ("hello" , KV .of ("b" , 33 ))))
3022+ .apply (ParDo .of (fn ));
3023+ PAssert .that (output ).empty ();
3024+ pipeline .run ();
3025+ }
3026+
29823027 @ Test
29833028 @ Category ({ValidatesRunner .class , UsesStatefulParDo .class , UsesMultimapState .class })
29843029 public void testMultimapStateClear () {
0 commit comments