@@ -77,21 +77,8 @@ public class ExportPipelineIT extends TemplateTestBase {
7777 + " \" fields\" : [\n "
7878 + " { \" name\" : \" Id\" , \" type\" : \" long\" , \" sqlType\" : \" INT64\" },\n "
7979 + " { \" name\" : \" FirstName\" , \" type\" : \" string\" },\n "
80- + " { \" name\" : \" LastName\" , \" type\" : \" string\" }\n "
81- + " ]\n "
82- + "}" );
83-
84- private static final Schema FLOAT32_SCHEMA =
85- new Schema .Parser ()
86- .parse (
87- "{\n "
88- + " \" type\" : \" record\" ,\n "
89- + " \" name\" : \" Float32\" ,\n "
90- + " \" namespace\" : \" com.google.cloud.teleport.spanner\" ,\n "
91- + " \" fields\" : [\n "
92- + " { \" name\" : \" Id\" , \" type\" : \" long\" , \" sqlType\" : \" INT64\" },\n "
93- + " { \" name\" : \" Rating\" , \" type\" : \" float\" },\n "
94- + " { \" name\" : \" PastRatings\" , \" type\" : [\" null\" , {\" type\" : \" array\" , \" items\" : [\" null\" , \" float\" ]}] }\n "
80+ + " { \" name\" : \" LastName\" , \" type\" : \" string\" },\n "
81+ + " { \" name\" : \" Rating\" , \" type\" : \" float\" }\n "
9582 + " ]\n "
9683 + "}" );
9784
@@ -186,6 +173,7 @@ public void testSpannerToGCSAvro() throws IOException {
186173 + " Id INT64 NOT NULL,\n "
187174 + " FirstName String(1024),\n "
188175 + " LastName String(1024),\n "
176+ + " Rating FLOAT32,\n "
189177 + ") PRIMARY KEY(Id)" ,
190178 testName );
191179 String createModelStructStatement =
@@ -254,6 +242,7 @@ public void testPostgresSpannerToGCSAvro() throws IOException {
254242 + " \" Id\" bigint,\n "
255243 + " \" FirstName\" character varying(256),\n "
256244 + " \" LastName\" character varying(256),\n "
245+ + " \" Rating\" real,\n "
257246 + "PRIMARY KEY(\" Id\" ))" ,
258247 testName );
259248
@@ -293,117 +282,14 @@ public void testPostgresSpannerToGCSAvro() throws IOException {
293282 assertThatGenericRecords (emptyRecords ).hasRows (0 );
294283 }
295284
296- @ Test
297- public void spannerToGCSAvroWithFloat32 () throws IOException {
298- // Arrange
299- String createFloat32TableStatement =
300- String .format (
301- "CREATE TABLE `%s_Float32` (\n "
302- + " Id INT64 NOT NULL,\n "
303- + " Rating FLOAT32,\n "
304- + " PastRatings ARRAY<FLOAT32>,\n "
305- + ") PRIMARY KEY(Id)" ,
306- testName );
307-
308- googleSqlResourceManager .executeDdlStatement (createFloat32TableStatement );
309- List <Mutation > expectedData = generateFloat32TableRows (String .format ("%s_Float32" , testName ));
310- googleSqlResourceManager .write (expectedData );
311- PipelineLauncher .LaunchConfig .Builder options =
312- PipelineLauncher .LaunchConfig .builder (testName , specPath )
313- .addParameter ("spannerProjectId" , PROJECT )
314- .addParameter ("instanceId" , googleSqlResourceManager .getInstanceId ())
315- .addParameter ("databaseId" , googleSqlResourceManager .getDatabaseId ())
316- .addParameter ("outputDir" , getGcsPath ("output/" ));
317-
318- // Act
319- PipelineLauncher .LaunchInfo info = launchTemplate (options );
320- assertThatPipeline (info ).isRunning ();
321- PipelineOperator .Result result = pipelineOperator ().waitUntilDone (createConfig (info ));
322-
323- // Assert
324- assertThatResult (result ).isLaunchFinished ();
325-
326- List <Artifact > float32TableArtifacts =
327- gcsClient .listArtifacts (
328- "output/" , Pattern .compile (String .format (".*/%s_%s.*\\ .avro.*" , testName , "Flaot32" )));
329-
330- assertThat (float32TableArtifacts ).isNotEmpty ();
331-
332- List <GenericRecord > float32TableRecords =
333- extractArtifacts (float32TableArtifacts , FLOAT32_SCHEMA );
334-
335- assertThatGenericRecords (float32TableRecords )
336- .hasRecordsUnorderedCaseInsensitiveColumns (mutationsToRecords (expectedData ));
337- }
338-
339- @ Test
340- public void postgresSpannerToGCSAvroWithFloat32 () throws IOException {
341- // Arrange
342- String createFloat32TableStatement =
343- String .format (
344- "CREATE TABLE \" %s_Float32\" (\n "
345- + " \" Id\" bigint,\n "
346- + " \" Ratings\" real,\n "
347- + " \" PastRatings\" real[],\n "
348- + "PRIMARY KEY(\" Id\" ))" ,
349- testName );
350-
351- postgresResourceManager .executeDdlStatement (createFloat32TableStatement );
352- List <Mutation > expectedData = generateFloat32TableRows (String .format ("%s_Float32" , testName ));
353- postgresResourceManager .write (expectedData );
354- PipelineLauncher .LaunchConfig .Builder options =
355- PipelineLauncher .LaunchConfig .builder (testName , specPath )
356- .addParameter ("spannerProjectId" , PROJECT )
357- .addParameter ("instanceId" , postgresResourceManager .getInstanceId ())
358- .addParameter ("databaseId" , postgresResourceManager .getDatabaseId ())
359- .addParameter ("outputDir" , getGcsPath ("output/" ));
360-
361- // Act
362- PipelineLauncher .LaunchInfo info = launchTemplate (options );
363- assertThatPipeline (info ).isRunning ();
364- PipelineOperator .Result result = pipelineOperator ().waitUntilDone (createConfig (info ));
365-
366- // Assert
367- assertThatResult (result ).isLaunchFinished ();
368-
369- List <Artifact > float32TableArtifacts =
370- gcsClient .listArtifacts (
371- "output/" , Pattern .compile (String .format (".*/%s_%s.*\\ .avro.*" , testName , "Float32" )));
372- assertThat (float32TableArtifacts ).isNotEmpty ();
373-
374- List <GenericRecord > float32TableRecords =
375- extractArtifacts (float32TableArtifacts , FLOAT32_SCHEMA );
376-
377- assertThatGenericRecords (float32TableRecords )
378- .hasRecordsUnorderedCaseInsensitiveColumns (mutationsToRecords (expectedData ));
379- }
380-
381285 private static List <Mutation > generateTableRows (String tableId ) {
382286 List <Mutation > mutations = new ArrayList <>();
383287 for (int i = 0 ; i < MESSAGES_COUNT ; i ++) {
384288 Mutation .WriteBuilder mutation = Mutation .newInsertBuilder (tableId );
385289 mutation .set ("Id" ).to (i );
386290 mutation .set ("FirstName" ).to (RandomStringUtils .randomAlphanumeric (1 , 20 ));
387291 mutation .set ("LastName" ).to (RandomStringUtils .randomAlphanumeric (1 , 20 ));
388- mutations .add (mutation .build ());
389- }
390-
391- return mutations ;
392- }
393-
394- private static List <Mutation > generateFloat32TableRows (String tableId ) {
395- List <Mutation > mutations = new ArrayList <>();
396- for (int i = 0 ; i < MESSAGES_COUNT ; i ++) {
397- Mutation .WriteBuilder mutation = Mutation .newInsertBuilder (tableId );
398- mutation .set ("Id" ).to (i );
399- mutation .set ("Rating" ).to (RandomUtils .nextFloat (-100 , Float .MAX_VALUE ));
400-
401- List <Float > floatArray = new ArrayList <>();
402- for (int j = 0 ; j < RandomUtils .nextInt (0 , 100 ); j ++) {
403- floatArray .add (RandomUtils .nextFloat (-100 , Float .MAX_VALUE ));
404- }
405- mutation .set ("PastRatings" ).toFloat32Array (floatArray );
406-
292+ mutation .set ("Rating" ).to (RandomUtils .nextFloat ());
407293 mutations .add (mutation .build ());
408294 }
409295
0 commit comments