|
42 | 42 | import org.apache.beam.sdk.io.fs.MetadataCoder; |
43 | 43 | import org.apache.beam.sdk.io.fs.ResourceId; |
44 | 44 | import org.apache.beam.sdk.io.fs.ResourceIdCoder; |
| 45 | +import org.apache.beam.sdk.schemas.NoSuchSchemaException; |
| 46 | +import org.apache.beam.sdk.schemas.SchemaRegistry; |
45 | 47 | import org.apache.beam.sdk.transforms.SerializableFunction; |
46 | 48 | import org.apache.beam.sdk.transforms.windowing.IntervalWindow; |
47 | 49 | import org.apache.beam.sdk.util.CoderUtils; |
@@ -195,11 +197,17 @@ public <T> Coder<T> coderFor( |
195 | 197 | * the lexicographically smallest {@link Class#getName() class name} being used. |
196 | 198 | * </ul> |
197 | 199 | */ |
| 200 | + public static CoderRegistry createDefault(@Nullable SchemaRegistry schemaRegistry) { |
| 201 | + return new CoderRegistry(schemaRegistry); |
| 202 | + } |
| 203 | + |
| 204 | + /** Backwards compatible version of createDefault. */ |
198 | 205 | public static CoderRegistry createDefault() { |
199 | | - return new CoderRegistry(); |
| 206 | + return new CoderRegistry(null); |
200 | 207 | } |
201 | 208 |
|
202 | | - private CoderRegistry() { |
| 209 | + private CoderRegistry(@Nullable SchemaRegistry schemaRegistry) { |
| 210 | + this.schemaRegistry = schemaRegistry; |
203 | 211 | coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES); |
204 | 212 | } |
205 | 213 |
|
@@ -590,6 +598,8 @@ private static boolean isNullOrEmpty(Collection<?> c) { |
590 | 598 | /** The list of {@link CoderProvider coder providers} to use to provide Coders. */ |
591 | 599 | private ArrayDeque<CoderProvider> coderProviders; |
592 | 600 |
|
| 601 | + private final @Nullable SchemaRegistry schemaRegistry; |
| 602 | + |
593 | 603 | /** |
594 | 604 | * Returns a {@link Coder} to use for values of the given type, in a context where the given types |
595 | 605 | * use the given coders. |
@@ -650,16 +660,28 @@ private Coder<?> getCoderFromParameterizedType( |
650 | 660 |
|
651 | 661 | List<Coder<?>> typeArgumentCoders = new ArrayList<>(); |
652 | 662 | for (Type typeArgument : type.getActualTypeArguments()) { |
653 | | - try { |
654 | | - Coder<?> typeArgumentCoder = |
655 | | - getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings); |
656 | | - typeArgumentCoders.add(typeArgumentCoder); |
657 | | - } catch (CannotProvideCoderException exc) { |
658 | | - throw new CannotProvideCoderException( |
659 | | - String.format( |
660 | | - "Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()), |
661 | | - exc); |
| 663 | + Coder<?> typeArgumentCoder = null; |
| 664 | + if (schemaRegistry != null) { |
| 665 | + TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument); |
| 666 | + try { |
| 667 | + typeArgumentCoder = schemaRegistry.getSchemaCoder(typeDescriptor); |
| 668 | + } catch (NoSuchSchemaException e) { |
| 669 | + // No schema. |
| 670 | + } |
| 671 | + } |
| 672 | + |
| 673 | + if (typeArgumentCoder == null) { |
| 674 | + try { |
| 675 | + typeArgumentCoder = |
| 676 | + getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings); |
| 677 | + } catch (CannotProvideCoderException exc) { |
| 678 | + throw new CannotProvideCoderException( |
| 679 | + String.format( |
| 680 | + "Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()), |
| 681 | + exc); |
| 682 | + } |
662 | 683 | } |
| 684 | + typeArgumentCoders.add(typeArgumentCoder); |
663 | 685 | } |
664 | 686 | return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders); |
665 | 687 | } |
|
0 commit comments