Skip to content

Commit c6364b7

Browse files
committed
Add avro integration tests for FLOAT32 type in Spanner.
This fixes the ExportPipelineIT and adds support for the ImportPipelineIT.
1 parent eca6acf commit c6364b7

8 files changed

Lines changed: 76 additions & 120 deletions

File tree

v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java

Lines changed: 5 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,8 @@ public class ExportPipelineIT extends TemplateTestBase {
7777
+ " \"fields\": [\n"
7878
+ " { \"name\": \"Id\", \"type\": \"long\", \"sqlType\": \"INT64\" },\n"
7979
+ " { \"name\": \"FirstName\", \"type\": \"string\" },\n"
80-
+ " { \"name\": \"LastName\", \"type\": \"string\" }\n"
81-
+ " ]\n"
82-
+ "}");
83-
84-
private static final Schema FLOAT32_SCHEMA =
85-
new Schema.Parser()
86-
.parse(
87-
"{\n"
88-
+ " \"type\": \"record\",\n"
89-
+ " \"name\": \"Float32\",\n"
90-
+ " \"namespace\": \"com.google.cloud.teleport.spanner\",\n"
91-
+ " \"fields\": [\n"
92-
+ " { \"name\": \"Id\", \"type\": \"long\", \"sqlType\": \"INT64\" },\n"
93-
+ " { \"name\": \"Rating\", \"type\": \"float\" },\n"
94-
+ " { \"name\": \"PastRatings\", \"type\": [\"null\" , {\"type\": \"array\", \"items\": [\"null\", \"float\"]}] }\n"
80+
+ " { \"name\": \"LastName\", \"type\": \"string\" },\n"
81+
+ " { \"name\": \"Rating\", \"type\": \"float\" }\n"
9582
+ " ]\n"
9683
+ "}");
9784

@@ -186,6 +173,7 @@ public void testSpannerToGCSAvro() throws IOException {
186173
+ " Id INT64 NOT NULL,\n"
187174
+ " FirstName String(1024),\n"
188175
+ " LastName String(1024),\n"
176+
+ " Rating FLOAT32,\n"
189177
+ ") PRIMARY KEY(Id)",
190178
testName);
191179
String createModelStructStatement =
@@ -254,6 +242,7 @@ public void testPostgresSpannerToGCSAvro() throws IOException {
254242
+ " \"Id\" bigint,\n"
255243
+ " \"FirstName\" character varying(256),\n"
256244
+ " \"LastName\" character varying(256),\n"
245+
+ " \"Rating\" real,\n"
257246
+ "PRIMARY KEY(\"Id\"))",
258247
testName);
259248

@@ -293,117 +282,14 @@ public void testPostgresSpannerToGCSAvro() throws IOException {
293282
assertThatGenericRecords(emptyRecords).hasRows(0);
294283
}
295284

296-
@Test
297-
public void spannerToGCSAvroWithFloat32() throws IOException {
298-
// Arrange
299-
String createFloat32TableStatement =
300-
String.format(
301-
"CREATE TABLE `%s_Float32` (\n"
302-
+ " Id INT64 NOT NULL,\n"
303-
+ " Rating FLOAT32,\n"
304-
+ " PastRatings ARRAY<FLOAT32>,\n"
305-
+ ") PRIMARY KEY(Id)",
306-
testName);
307-
308-
googleSqlResourceManager.executeDdlStatement(createFloat32TableStatement);
309-
List<Mutation> expectedData = generateFloat32TableRows(String.format("%s_Float32", testName));
310-
googleSqlResourceManager.write(expectedData);
311-
PipelineLauncher.LaunchConfig.Builder options =
312-
PipelineLauncher.LaunchConfig.builder(testName, specPath)
313-
.addParameter("spannerProjectId", PROJECT)
314-
.addParameter("instanceId", googleSqlResourceManager.getInstanceId())
315-
.addParameter("databaseId", googleSqlResourceManager.getDatabaseId())
316-
.addParameter("outputDir", getGcsPath("output/"));
317-
318-
// Act
319-
PipelineLauncher.LaunchInfo info = launchTemplate(options);
320-
assertThatPipeline(info).isRunning();
321-
PipelineOperator.Result result = pipelineOperator().waitUntilDone(createConfig(info));
322-
323-
// Assert
324-
assertThatResult(result).isLaunchFinished();
325-
326-
List<Artifact> float32TableArtifacts =
327-
gcsClient.listArtifacts(
328-
"output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Flaot32")));
329-
330-
assertThat(float32TableArtifacts).isNotEmpty();
331-
332-
List<GenericRecord> float32TableRecords =
333-
extractArtifacts(float32TableArtifacts, FLOAT32_SCHEMA);
334-
335-
assertThatGenericRecords(float32TableRecords)
336-
.hasRecordsUnorderedCaseInsensitiveColumns(mutationsToRecords(expectedData));
337-
}
338-
339-
@Test
340-
public void postgresSpannerToGCSAvroWithFloat32() throws IOException {
341-
// Arrange
342-
String createFloat32TableStatement =
343-
String.format(
344-
"CREATE TABLE \"%s_Float32\" (\n"
345-
+ " \"Id\" bigint,\n"
346-
+ " \"Ratings\" real,\n"
347-
+ " \"PastRatings\" real[],\n"
348-
+ "PRIMARY KEY(\"Id\"))",
349-
testName);
350-
351-
postgresResourceManager.executeDdlStatement(createFloat32TableStatement);
352-
List<Mutation> expectedData = generateFloat32TableRows(String.format("%s_Float32", testName));
353-
postgresResourceManager.write(expectedData);
354-
PipelineLauncher.LaunchConfig.Builder options =
355-
PipelineLauncher.LaunchConfig.builder(testName, specPath)
356-
.addParameter("spannerProjectId", PROJECT)
357-
.addParameter("instanceId", postgresResourceManager.getInstanceId())
358-
.addParameter("databaseId", postgresResourceManager.getDatabaseId())
359-
.addParameter("outputDir", getGcsPath("output/"));
360-
361-
// Act
362-
PipelineLauncher.LaunchInfo info = launchTemplate(options);
363-
assertThatPipeline(info).isRunning();
364-
PipelineOperator.Result result = pipelineOperator().waitUntilDone(createConfig(info));
365-
366-
// Assert
367-
assertThatResult(result).isLaunchFinished();
368-
369-
List<Artifact> float32TableArtifacts =
370-
gcsClient.listArtifacts(
371-
"output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Float32")));
372-
assertThat(float32TableArtifacts).isNotEmpty();
373-
374-
List<GenericRecord> float32TableRecords =
375-
extractArtifacts(float32TableArtifacts, FLOAT32_SCHEMA);
376-
377-
assertThatGenericRecords(float32TableRecords)
378-
.hasRecordsUnorderedCaseInsensitiveColumns(mutationsToRecords(expectedData));
379-
}
380-
381285
private static List<Mutation> generateTableRows(String tableId) {
382286
List<Mutation> mutations = new ArrayList<>();
383287
for (int i = 0; i < MESSAGES_COUNT; i++) {
384288
Mutation.WriteBuilder mutation = Mutation.newInsertBuilder(tableId);
385289
mutation.set("Id").to(i);
386290
mutation.set("FirstName").to(RandomStringUtils.randomAlphanumeric(1, 20));
387291
mutation.set("LastName").to(RandomStringUtils.randomAlphanumeric(1, 20));
388-
mutations.add(mutation.build());
389-
}
390-
391-
return mutations;
392-
}
393-
394-
private static List<Mutation> generateFloat32TableRows(String tableId) {
395-
List<Mutation> mutations = new ArrayList<>();
396-
for (int i = 0; i < MESSAGES_COUNT; i++) {
397-
Mutation.WriteBuilder mutation = Mutation.newInsertBuilder(tableId);
398-
mutation.set("Id").to(i);
399-
mutation.set("Rating").to(RandomUtils.nextFloat(-100, Float.MAX_VALUE));
400-
401-
List<Float> floatArray = new ArrayList<>();
402-
for (int j = 0; j < RandomUtils.nextInt(0, 100); j++) {
403-
floatArray.add(RandomUtils.nextFloat(-100, Float.MAX_VALUE));
404-
}
405-
mutation.set("PastRatings").toFloat32Array(floatArray);
406-
292+
mutation.set("Rating").to(RandomUtils.nextFloat());
407293
mutations.add(mutation.build());
408294
}
409295

v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ private void uploadImportPipelineArtifacts(String subdirectory) throws IOExcepti
7878
"input/Singers-manifest.json",
7979
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Singers-manifest.json")
8080
.getPath());
81+
gcsClient.uploadArtifact(
82+
"input/Float32Table.avro-00000-of-00001",
83+
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Float32Table.avro").getPath());
84+
gcsClient.uploadArtifact(
85+
"input/Float32Table-manifest.json",
86+
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Float32Table-manifest.json")
87+
.getPath());
8188

8289
if (Objects.equals(subdirectory, "googlesql")) {
8390
gcsClient.uploadArtifact(
@@ -105,6 +112,23 @@ private List<Map<String, Object>> getExpectedRows() {
105112
return expectedRows;
106113
}
107114

115+
private List<Map<String, Object>> getFloat32TableExpectedRows() {
116+
List<Map<String, Object>> expectedRows = new ArrayList<>();
117+
expectedRows.add(ImmutableMap.of("Key", "1", "Float32Value", 3.14f));
118+
expectedRows.add(ImmutableMap.of("Key", "2", "Float32Value", 1.1f));
119+
expectedRows.add(ImmutableMap.of("Key", "3", "Float32Value", 3.402823E38f));
120+
expectedRows.add(ImmutableMap.of("Key", "4", "Float32Value", Float.NaN));
121+
expectedRows.add(ImmutableMap.of("Key", "5", "Float32Value", Float.POSITIVE_INFINITY));
122+
expectedRows.add(ImmutableMap.of("Key", "6", "Float32Value", Float.NEGATIVE_INFINITY));
123+
expectedRows.add(ImmutableMap.of("Key", "7", "Float32Value", 1.175493E-38));
124+
expectedRows.add(ImmutableMap.of("Key", "8", "Float32Value", -3.402823E38f));
125+
// The custom assertions in Beam do not seem to support null values.
126+
// Using the string NULL to match the string representation created in
127+
// assertThatStructs. The actual value in avro is a plain `null`.
128+
expectedRows.add(ImmutableMap.of("Key", "9", "Float32Value", "NULL"));
129+
return expectedRows;
130+
}
131+
108132
@After
109133
public void tearDown() {
110134
ResourceManagerUtils.cleanResources(googleSqlResourceManager, postgresResourceManager);
@@ -126,6 +150,13 @@ public void testGoogleSqlImportPipeline() throws IOException {
126150
+ ") PRIMARY KEY(Id)";
127151
googleSqlResourceManager.executeDdlStatement(createSingersTableStatement);
128152

153+
String createFloat32TableStatement =
154+
"CREATE TABLE Float32Table (\n"
155+
+ " Key STRING(MAX) NOT NULL,\n"
156+
+ " Float32Value FLOAT32,\n"
157+
+ ") PRIMARY KEY(Key)";
158+
googleSqlResourceManager.executeDdlStatement(createFloat32TableStatement);
159+
129160
PipelineLauncher.LaunchConfig.Builder options =
130161
PipelineLauncher.LaunchConfig.builder(testName, specPath)
131162
.addParameter("spannerProjectId", PROJECT)
@@ -150,6 +181,13 @@ public void testGoogleSqlImportPipeline() throws IOException {
150181
"Singers", ImmutableList.of("Id", "FirstName", "LastName"));
151182
assertThat(singersRecords).hasSize(4);
152183
assertThatStructs(singersRecords).hasRecordsUnordered(getExpectedRows());
184+
185+
List<Struct> float32Records =
186+
googleSqlResourceManager.readTableRecords(
187+
"Float32Table", ImmutableList.of("Key", "Float32Value"));
188+
189+
assertThat(float32Records).hasSize(9);
190+
assertThatStructs(float32Records).hasRecordsUnordered(getFloat32TableExpectedRows());
153191
}
154192

155193
@Test
@@ -168,6 +206,13 @@ public void testPostgresImportPipeline() throws IOException {
168206
+ "PRIMARY KEY(\"Id\"))";
169207
postgresResourceManager.executeDdlStatement(createSingersTableStatement);
170208

209+
String createFloat32TableStatement =
210+
"CREATE TABLE \"Float32Table\" (\n"
211+
+ " \"Key\" character varying NOT NULL,\n"
212+
+ " \"Float32Value\" real,\n"
213+
+ "PRIMARY KEY(\"Key\"))";
214+
postgresResourceManager.executeDdlStatement(createFloat32TableStatement);
215+
171216
PipelineLauncher.LaunchConfig.Builder options =
172217
PipelineLauncher.LaunchConfig.builder(testName, specPath)
173218
.addParameter("spannerProjectId", PROJECT)
@@ -192,5 +237,12 @@ public void testPostgresImportPipeline() throws IOException {
192237
"Singers", ImmutableList.of("Id", "FirstName", "LastName"));
193238
assertThat(singersRecords).hasSize(4);
194239
assertThatStructs(singersRecords).hasRecordsUnordered(getExpectedRows());
240+
241+
List<Struct> float32Records =
242+
postgresResourceManager.readTableRecords(
243+
"Float32Table", ImmutableList.of("Key", "Float32Value"));
244+
245+
assertThat(float32Records).hasSize(9);
246+
assertThatStructs(float32Records).hasRecordsUnordered(getFloat32TableExpectedRows());
195247
}
196248
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"files": [{
3+
"name": "Float32Table.avro-00000-of-00001",
4+
"md5": "0OmM9RMlt3GKwpA71B0iRg\u003d\u003d"
5+
}]
6+
}
465 Bytes
Binary file not shown.

v1/src/test/resources/ImportPipelineIT/googlesql/spanner-export.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
}, {
66
"name": "EmptyTable",
77
"manifestFile": "EmptyTable-manifest.json"
8+
}, {
9+
"name": "Float32Table",
10+
"manifestFile": "Float32Table-manifest.json"
811
}, {
912
"name": "ModelStruct",
1013
"manifestFile": "ModelStruct-manifest.json"
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"files": [{
3+
"name": "Float32Table.avro-00000-of-00001",
4+
"md5": "0OmM9RMlt3GKwpA71B0iRg\u003d\u003d"
5+
}]
6+
}
465 Bytes
Binary file not shown.

v1/src/test/resources/ImportPipelineIT/postgres/spanner-export.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
"tables": [{
33
"name": "Singers",
44
"manifestFile": "Singers-manifest.json"
5+
}, {
6+
"name": "Float32Table",
7+
"manifestFile": "Float32Table-manifest.json"
58
}, {
69
"name": "EmptyTable",
710
"manifestFile": "EmptyTable-manifest.json"
811
}],
912
"dialect": "POSTGRESQL"
10-
}
13+
}

0 commit comments

Comments
 (0)