Skip to content

Commit b33a843

Browse files
authored
Revert global snake_case convention for SchemaTransforms (#31109)
* revert global snake_case convention and make it a special case for iceberg and managed * remove docs and comments too * cleanup * revert python and yaml changes too * fix test
1 parent 088c854 commit b33a843

17 files changed

Lines changed: 155 additions & 72 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 2
44
}

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,17 @@
1717
*/
1818
package org.apache.beam.sdk.schemas.transforms;
1919

20-
import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
2120
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2221
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
23-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2422

2523
import java.lang.reflect.ParameterizedType;
2624
import java.util.List;
2725
import java.util.Optional;
2826
import javax.annotation.Nullable;
2927
import org.apache.beam.sdk.annotations.Internal;
3028
import org.apache.beam.sdk.options.PipelineOptions;
31-
import org.apache.beam.sdk.schemas.AutoValueSchema;
3229
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
3330
import org.apache.beam.sdk.schemas.Schema;
34-
import org.apache.beam.sdk.schemas.SchemaProvider;
3531
import org.apache.beam.sdk.schemas.SchemaRegistry;
3632
import org.apache.beam.sdk.values.Row;
3733

@@ -45,9 +41,6 @@
4541
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
4642
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
4743
*
48-
* <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
49-
* {@code snake_case} naming convention.
50-
*
5144
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
5245
* we provide implementations for more standard Beam transforms. We provide no backwards
5346
* compatibility guarantees and it should not be implemented outside of the Beam repository.
@@ -85,11 +78,10 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
8578
}
8679

8780
@Override
88-
public final Schema configurationSchema() {
81+
public Schema configurationSchema() {
8982
try {
9083
// Sort the fields by name to ensure a consistent schema is produced
91-
// We also establish a `snake_case` convention for all SchemaTransform configurations
92-
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
84+
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
9385
} catch (NoSuchSchemaException e) {
9486
throw new RuntimeException(
9587
"Unable to find schema for "
@@ -98,12 +90,9 @@ public final Schema configurationSchema() {
9890
}
9991
}
10092

101-
/**
102-
* Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
103-
* `snake_case` naming convention.
104-
*/
93+
/** Produces a {@link SchemaTransform} from a Row configuration. */
10594
@Override
106-
public final SchemaTransform from(Row configuration) {
95+
public SchemaTransform from(Row configuration) {
10796
return from(configFromRow(configuration));
10897
}
10998

@@ -114,20 +103,9 @@ public final Optional<List<String>> dependencies(Row configuration, PipelineOpti
114103

115104
private ConfigT configFromRow(Row configuration) {
116105
try {
117-
SchemaRegistry registry = SchemaRegistry.createDefault();
118-
119-
// Configuration objects handled by the AutoValueSchema provider will expect Row fields with
120-
// camelCase naming convention
121-
SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
122-
if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
123-
&& checkNotNull(
124-
((DefaultSchemaProvider) schemaProvider)
125-
.getUnderlyingSchemaProvider(configurationClass()))
126-
.getClass()
127-
.equals(AutoValueSchema.class)) {
128-
configuration = configuration.toCamelCase();
129-
}
130-
return registry.getFromRowFunction(configurationClass()).apply(configuration);
106+
return SchemaRegistry.createDefault()
107+
.getFromRowFunction(configurationClass())
108+
.apply(configuration);
131109
} catch (NoSuchSchemaException e) {
132110
throw new RuntimeException(
133111
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ public void testFrom() {
130130

131131
Row inputConfig =
132132
Row.withSchema(provider.configurationSchema())
133-
.withFieldValue("string_field", "field1")
134-
.withFieldValue("integer_field", Integer.valueOf(13))
133+
.withFieldValue("stringField", "field1")
134+
.withFieldValue("integerField", Integer.valueOf(13))
135135
.build();
136136

137137
Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
@@ -150,8 +150,8 @@ public void testDependencies() {
150150
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
151151
Row inputConfig =
152152
Row.withSchema(provider.configurationSchema())
153-
.withFieldValue("string_field", "field1")
154-
.withFieldValue("integer_field", Integer.valueOf(13))
153+
.withFieldValue("stringField", "field1")
154+
.withFieldValue("integerField", Integer.valueOf(13))
155155
.build();
156156

157157
assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.managed.ManagedTransformConstants;
2626
import org.apache.beam.sdk.schemas.AutoValueSchema;
2727
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
28+
import org.apache.beam.sdk.schemas.Schema;
2829
import org.apache.beam.sdk.schemas.SchemaRegistry;
2930
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
3031
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -131,4 +132,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
131132
return PCollectionRowTuple.of(OUTPUT_TAG, output);
132133
}
133134
}
135+
136+
// TODO: set global snake_case naming convention and remove these special cases
137+
@Override
138+
public SchemaTransform from(Row rowConfig) {
139+
return super.from(rowConfig.toCamelCase());
140+
}
141+
142+
@Override
143+
public Schema configurationSchema() {
144+
return super.configurationSchema().toSnakeCase();
145+
}
134146
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,15 @@ public Row apply(KV<String, SnapshotInfo> input) {
176176
}
177177
}
178178
}
179+
180+
// TODO: set global snake_case naming convention and remove these special cases
181+
@Override
182+
public SchemaTransform from(Row rowConfig) {
183+
return super.from(rowConfig.toCamelCase());
184+
}
185+
186+
@Override
187+
public Schema configurationSchema() {
188+
return super.configurationSchema().toSnakeCase();
189+
}
179190
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,17 +112,17 @@ public void testFindTransformAndMakeItWork() {
112112

113113
assertEquals(
114114
Sets.newHashSet(
115-
"bootstrap_servers",
115+
"bootstrapServers",
116116
"topic",
117117
"schema",
118-
"auto_offset_reset_config",
119-
"consumer_config_updates",
118+
"autoOffsetResetConfig",
119+
"consumerConfigUpdates",
120120
"format",
121-
"confluent_schema_registry_subject",
122-
"confluent_schema_registry_url",
123-
"error_handling",
124-
"file_descriptor_path",
125-
"message_name"),
121+
"confluentSchemaRegistrySubject",
122+
"confluentSchemaRegistryUrl",
123+
"errorHandling",
124+
"fileDescriptorPath",
125+
"messageName"),
126126
kafkaProvider.configurationSchema().getFields().stream()
127127
.map(field -> field.getName())
128128
.collect(Collectors.toSet()));

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ Row getConfigurationRow() {
198198
}
199199
}
200200

201-
/** */
202201
@VisibleForTesting
203202
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
204203
// May return an empty row (perhaps the underlying transform doesn't have any required
@@ -209,4 +208,15 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
209208
Map<String, SchemaTransformProvider> getAllProviders() {
210209
return schemaTransformProviders;
211210
}
211+
212+
// TODO: set global snake_case naming convention and remove these special cases
213+
@Override
214+
public SchemaTransform from(Row rowConfig) {
215+
return super.from(rowConfig.toCamelCase());
216+
}
217+
218+
@Override
219+
public Schema configurationSchema() {
220+
return super.configurationSchema().toSnakeCase();
221+
}
212222
}

sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {
5151

5252
@Test
5353
public void testGetConfigRowFromYamlString() {
54-
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
54+
String yamlString = "extraString: abc\n" + "extraInteger: 123";
5555
ManagedConfig config =
5656
ManagedConfig.builder()
5757
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
@@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {
6060

6161
Row expectedRow =
6262
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
63-
.withFieldValue("extra_string", "abc")
64-
.withFieldValue("extra_integer", 123)
63+
.withFieldValue("extraString", "abc")
64+
.withFieldValue("extraInteger", 123)
6565
.build();
6666

6767
Row returnedRow =
@@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
8484
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
8585
Row expectedRow =
8686
Row.withSchema(configSchema)
87-
.withFieldValue("extra_string", "abc")
88-
.withFieldValue("extra_integer", 123)
87+
.withFieldValue("extraString", "abc")
88+
.withFieldValue("extraInteger", 123)
8989
.build();
9090
Row configRow =
9191
ManagedSchemaTransformProvider.getRowConfig(
@@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
9696

9797
@Test
9898
public void testBuildWithYamlString() {
99-
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
99+
String yamlString = "extraString: abc\n" + "extraInteger: 123";
100100

101101
ManagedConfig config =
102102
ManagedConfig.builder()

sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio
8484

8585
@Test
8686
public void testReCreateTransformFromRowWithConfig() {
87-
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
87+
String yamlString = "extraString: abc\n" + "extraInteger: 123";
8888

8989
ManagedConfig originalConfig =
9090
ManagedConfig.builder()
@@ -123,8 +123,8 @@ public void testProtoTranslation() throws Exception {
123123
.setRowSchema(inputSchema);
124124
Map<String, Object> underlyingConfig =
125125
ImmutableMap.<String, Object>builder()
126-
.put("extra_string", "abc")
127-
.put("extra_integer", 123)
126+
.put("extraString", "abc")
127+
.put("extraInteger", 123)
128128
.build();
129129
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
130130
Managed.ManagedTransform transform =

sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
9090
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
9191
.build()
9292
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
93-
.withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));
93+
.withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));
9494

9595
runTestProviderTest(writeOp);
9696
}

0 commit comments

Comments
 (0)