Skip to content

Commit c243491

Browse files
authored
Merge pull request #32705: fix schema inference for parameterized types
1 parent 20d0f6e commit c243491

3 files changed

Lines changed: 54 additions & 12 deletions

File tree

sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ public PipelineResult run(PipelineOptions options) {
335335
/** Returns the {@link CoderRegistry} that this {@link Pipeline} uses. */
336336
public CoderRegistry getCoderRegistry() {
337337
if (coderRegistry == null) {
338-
coderRegistry = CoderRegistry.createDefault();
338+
coderRegistry = CoderRegistry.createDefault(getSchemaRegistry());
339339
}
340340
return coderRegistry;
341341
}

sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.apache.beam.sdk.io.fs.MetadataCoder;
4343
import org.apache.beam.sdk.io.fs.ResourceId;
4444
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
45+
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
46+
import org.apache.beam.sdk.schemas.SchemaRegistry;
4547
import org.apache.beam.sdk.transforms.SerializableFunction;
4648
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
4749
import org.apache.beam.sdk.util.CoderUtils;
@@ -195,11 +197,17 @@ public <T> Coder<T> coderFor(
195197
* the lexicographically smallest {@link Class#getName() class name} being used.
196198
* </ul>
197199
*/
200+
public static CoderRegistry createDefault(@Nullable SchemaRegistry schemaRegistry) {
201+
return new CoderRegistry(schemaRegistry);
202+
}
203+
204+
/** Backwards compatible version of createDefault. */
198205
public static CoderRegistry createDefault() {
199-
return new CoderRegistry();
206+
return new CoderRegistry(null);
200207
}
201208

202-
private CoderRegistry() {
209+
private CoderRegistry(@Nullable SchemaRegistry schemaRegistry) {
210+
this.schemaRegistry = schemaRegistry;
203211
coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES);
204212
}
205213

@@ -590,6 +598,8 @@ private static boolean isNullOrEmpty(Collection<?> c) {
590598
/** The list of {@link CoderProvider coder providers} to use to provide Coders. */
591599
private ArrayDeque<CoderProvider> coderProviders;
592600

601+
private final @Nullable SchemaRegistry schemaRegistry;
602+
593603
/**
594604
* Returns a {@link Coder} to use for values of the given type, in a context where the given types
595605
* use the given coders.
@@ -650,16 +660,28 @@ private Coder<?> getCoderFromParameterizedType(
650660

651661
List<Coder<?>> typeArgumentCoders = new ArrayList<>();
652662
for (Type typeArgument : type.getActualTypeArguments()) {
653-
try {
654-
Coder<?> typeArgumentCoder =
655-
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
656-
typeArgumentCoders.add(typeArgumentCoder);
657-
} catch (CannotProvideCoderException exc) {
658-
throw new CannotProvideCoderException(
659-
String.format(
660-
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
661-
exc);
663+
Coder<?> typeArgumentCoder = null;
664+
if (schemaRegistry != null) {
665+
TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument);
666+
try {
667+
typeArgumentCoder = schemaRegistry.getSchemaCoder(typeDescriptor);
668+
} catch (NoSuchSchemaException e) {
669+
// No schema.
670+
}
671+
}
672+
673+
if (typeArgumentCoder == null) {
674+
try {
675+
typeArgumentCoder =
676+
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
677+
} catch (CannotProvideCoderException exc) {
678+
throw new CannotProvideCoderException(
679+
String.format(
680+
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
681+
exc);
682+
}
662683
}
684+
typeArgumentCoders.add(typeArgumentCoder);
663685
}
664686
return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
665687
}

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
import com.google.auto.service.AutoService;
2727
import com.google.auto.value.AutoValue;
2828
import java.util.List;
29+
import org.apache.beam.sdk.coders.CannotProvideCoderException;
30+
import org.apache.beam.sdk.coders.Coder;
31+
import org.apache.beam.sdk.coders.CoderRegistry;
32+
import org.apache.beam.sdk.coders.IterableCoder;
2933
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
3034
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
3135
import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
@@ -223,6 +227,22 @@ public void testRegisterPojo() throws NoSuchSchemaException {
223227
assertTrue(SIMPLE_POJO_SCHEMA.equivalent(schema));
224228
}
225229

230+
@Test
231+
public void testSchemaTypeParameterInsideCoder() throws CannotProvideCoderException {
232+
SchemaRegistry schemaRegistry = SchemaRegistry.createDefault();
233+
schemaRegistry.registerPOJO(SimplePOJO.class);
234+
235+
CoderRegistry coderRegistry = CoderRegistry.createDefault(schemaRegistry);
236+
Coder<Iterable<SimplePOJO>> coder =
237+
coderRegistry.getCoder(TypeDescriptors.iterables(TypeDescriptor.of(SimplePOJO.class)));
238+
assertTrue(coder instanceof IterableCoder);
239+
assertEquals(1, coder.getCoderArguments().size());
240+
assertTrue(coder.getCoderArguments().get(0) instanceof SchemaCoder);
241+
assertTrue(
242+
SIMPLE_POJO_SCHEMA.equivalent(
243+
((SchemaCoder<SimplePOJO>) coder.getCoderArguments().get(0)).getSchema()));
244+
}
245+
226246
@Test
227247
public void testRegisterJavaBean() throws NoSuchSchemaException {
228248
SchemaRegistry registry = SchemaRegistry.createDefault();

0 commit comments

Comments
 (0)