2525import org .apache .beam .sdk .coders .Coder .NonDeterministicException ;
2626import org .apache .beam .sdk .coders .IterableCoder ;
2727import org .apache .beam .sdk .coders .KvCoder ;
28- import org .apache .beam .sdk .options .PipelineOptions ;
2928import org .apache .beam .sdk .runners .AppliedPTransform ;
30- import org .apache .beam .sdk .transforms .GroupByEncryptedKey ;
3129import org .apache .beam .sdk .transforms .PTransform ;
3230import org .apache .beam .sdk .transforms .windowing .DefaultTrigger ;
3331import org .apache .beam .sdk .transforms .windowing .GlobalWindows ;
34- import org .apache .beam .sdk .util .Secret ;
3532import org .apache .beam .sdk .util .construction .PTransformTranslation ;
3633import org .apache .beam .sdk .util .construction .SdkComponents ;
3734import org .apache .beam .sdk .util .construction .TransformPayloadTranslatorRegistrar ;
3835import org .apache .beam .sdk .values .KV ;
3936import org .apache .beam .sdk .values .PCollection ;
4037import org .apache .beam .sdk .values .PCollection .IsBounded ;
4138import org .apache .beam .sdk .values .WindowingStrategy ;
42- import org .checkerframework .checker .nullness .qual .Nullable ;
4339
4440/**
4541 * Specialized implementation of {@code GroupByKey} for translating Redistribute transform into
@@ -50,13 +46,9 @@ public class DataflowGroupByKey<K, V>
5046
5147 // Plumbed from Redistribute transform.
5248 private final boolean allowDuplicates ;
53- private boolean insideGBEK ;
54- private boolean surroundsGBEK ;
5549
5650 private DataflowGroupByKey (boolean allowDuplicates ) {
5751 this .allowDuplicates = allowDuplicates ;
58- this .insideGBEK = false ;
59- this .surroundsGBEK = false ;
6052 }
6153
6254 /**
@@ -87,22 +79,6 @@ public boolean allowDuplicates() {
8779 return allowDuplicates ;
8880 }
8981
90- /**
91- * For Beam internal use only. Tells runner that this is an inner GBK inside of a
92- * GroupByEncryptedKey
93- */
94- public void setInsideGBEK () {
95- this .insideGBEK = true ;
96- }
97-
98- /**
99- * For Beam internal use only. Tells runner that this is a GBK wrapped around of a
100- * GroupByEncryptedKey
101- */
102- public boolean surroundsGBEK () {
103- return this .surroundsGBEK ;
104- }
105-
10682 /////////////////////////////////////////////////////////////////////////////
10783
10884 public static void applicableTo (PCollection <?> input ) {
@@ -141,20 +117,6 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
141117 "the keyCoder of a DataflowGroupByKey must be deterministic" , e );
142118 }
143119
144- PipelineOptions options = input .getPipeline ().getOptions ();
145- String gbekOveride = options .getGbek ();
146- if (!this .insideGBEK && gbekOveride != null && !gbekOveride .trim ().isEmpty ()) {
147- this .surroundsGBEK = true ;
148- Secret hmacSecret = Secret .parseSecretOption (gbekOveride );
149- DataflowGroupByKey <byte [], KV <byte [], byte []>> gbk = DataflowGroupByKey .create ();
150- if (this .allowDuplicates ) {
151- gbk = DataflowGroupByKey .createWithAllowDuplicates ();
152- }
153- gbk .setInsideGBEK ();
154- GroupByEncryptedKey <K , V > gbek = GroupByEncryptedKey .createWithCustomGbk (hmacSecret , gbk );
155- return input .apply (gbek );
156- }
157-
158120 // This primitive operation groups by the combination of key and window,
159121 // merging windows as needed, using the windows assigned to the
160122 // key/value input elements and the window merge operation of the
@@ -209,22 +171,10 @@ public String getUrn() {
209171 return PTransformTranslation .GROUP_BY_KEY_TRANSFORM_URN ;
210172 }
211173
212- @ Override
213- public String getUrn (DataflowGroupByKey <?, ?> transform ) {
214- if (transform .surroundsGBEK ()) {
215- return PTransformTranslation .GROUP_BY_KEY_WRAPPER_TRANSFORM_URN ;
216- }
217- return PTransformTranslation .GROUP_BY_KEY_TRANSFORM_URN ;
218- }
219-
220174 @ Override
221175 @ SuppressWarnings ("nullness" )
222- public RunnerApi .@ Nullable FunctionSpec translate (
176+ public RunnerApi .FunctionSpec translate (
223177 AppliedPTransform <?, ?, DataflowGroupByKey <?, ?>> transform , SdkComponents components ) {
224- if (transform .getTransform ().surroundsGBEK ()) {
225- // Can use null for spec for empty composite.
226- return null ;
227- }
228178 return RunnerApi .FunctionSpec .newBuilder ().setUrn (getUrn (transform .getTransform ())).build ();
229179 }
230180 }
0 commit comments