2121import static org .junit .Assert .assertEquals ;
2222import static org .junit .Assert .assertNotEquals ;
2323import static org .junit .Assert .assertNull ;
24- import static org .junit .Assert .assertTrue ;
2524
2625import com .google .cloud .Timestamp ;
2726import java .io .IOException ;
28- import java .util .Arrays ;
2927import java .util .Collection ;
3028import java .util .Collections ;
3129import java .util .HashMap ;
4341import org .apache .beam .sdk .extensions .avro .schemas .utils .AvroUtils ;
4442import org .apache .beam .sdk .io .GenerateSequence ;
4543import org .apache .beam .sdk .io .Read ;
46- import org .apache .beam .sdk .io .common .HashingFn ;
4744import org .apache .beam .sdk .io .common .IOITHelper ;
4845import org .apache .beam .sdk .io .common .IOTestPipelineOptions ;
4946import org .apache .beam .sdk .io .synthetic .SyntheticBoundedSource ;
5047import org .apache .beam .sdk .io .synthetic .SyntheticSourceOptions ;
51- import org .apache .beam .sdk .metrics .Counter ;
52- import org .apache .beam .sdk .metrics .Metrics ;
5348import org .apache .beam .sdk .options .Default ;
5449import org .apache .beam .sdk .options .Description ;
5550import org .apache .beam .sdk .options .ExperimentalOptions ;
6863import org .apache .beam .sdk .testutils .metrics .TimeMonitor ;
6964import org .apache .beam .sdk .testutils .publishing .InfluxDBSettings ;
7065import org .apache .beam .sdk .transforms .Combine ;
66+ import org .apache .beam .sdk .transforms .Count ;
7167import org .apache .beam .sdk .transforms .Create ;
7268import org .apache .beam .sdk .transforms .DoFn ;
7369import org .apache .beam .sdk .transforms .Flatten ;
7672import org .apache .beam .sdk .transforms .MapElements ;
7773import org .apache .beam .sdk .transforms .ParDo ;
7874import org .apache .beam .sdk .transforms .SerializableFunction ;
79- import org .apache .beam .sdk .transforms .SimpleFunction ;
8075import org .apache .beam .sdk .transforms .Values ;
76+ import org .apache .beam .sdk .transforms .windowing .CalendarWindows ;
8177import org .apache .beam .sdk .transforms .windowing .FixedWindows ;
8278import org .apache .beam .sdk .transforms .windowing .Window ;
8379import org .apache .beam .sdk .values .KV ;
@@ -139,8 +135,6 @@ public class KafkaIOIT {
139135
140136 private static final Logger LOG = LoggerFactory .getLogger (KafkaIOIT .class );
141137
142- private static String expectedHashcode ;
143-
144138 private static SyntheticSourceOptions sourceOptions ;
145139
146140 private static Options options ;
@@ -202,17 +196,23 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
202196
203197 // Use streaming pipeline to read Kafka records.
204198 readPipeline .getOptions ().as (Options .class ).setStreaming (true );
205- readPipeline
206- .apply ("Read from unbounded Kafka" , readFromKafka ().withTopic (options .getKafkaTopic ()))
207- .apply ("Measure read time" , ParDo .of (new TimeMonitor <>(NAMESPACE , READ_TIME_METRIC_NAME )))
208- .apply ("Map records to strings" , MapElements .via (new MapKafkaRecordsToStrings ()))
209- .apply ("Counting element" , ParDo .of (new CountingFn (NAMESPACE , READ_ELEMENT_METRIC_NAME )));
199+ PCollection <Long > count =
200+ readPipeline
201+ .apply ("Read from unbounded Kafka" , readFromKafka ().withTopic (options .getKafkaTopic ()))
202+ .apply (
203+ "Measure read time" , ParDo .of (new TimeMonitor <>(NAMESPACE , READ_TIME_METRIC_NAME )))
204+ .apply ("Window" , Window .into (CalendarWindows .years (1 )))
205+ .apply (
206+ "Counting element" ,
207+ Combine .globally (Count .<KafkaRecord <byte [], byte []>>combineFn ()).withoutDefaults ());
210208
211209 PipelineResult writeResult = writePipeline .run ();
212210 PipelineResult .State writeState = writeResult .waitUntilFinish ();
213211 // Fail the test if pipeline failed.
214212 assertNotEquals (PipelineResult .State .FAILED , writeState );
215213
214+ PAssert .thatSingleton (count ).isEqualTo (sourceOptions .numRecords );
215+
216216 PipelineResult readResult = readPipeline .run ();
217217 PipelineResult .State readState =
218218 readResult .waitUntilFinish (Duration .standardSeconds (options .getReadTimeout ()));
@@ -221,13 +221,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
221221 tearDownTopic (options .getKafkaTopic ());
222222 cancelIfTimeouted (readResult , readState );
223223
224- long actualRecords = readElementMetric (readResult , NAMESPACE , READ_ELEMENT_METRIC_NAME );
225- assertTrue (
226- String .format (
227- "actual number of records %d smaller than expected: %d." ,
228- actualRecords , sourceOptions .numRecords ),
229- sourceOptions .numRecords <= actualRecords );
230-
231224 if (!options .isWithTestcontainers ()) {
232225 Set <NamedTestResult > metrics = readMetrics (writeResult , readResult );
233226 IOITMetrics .publishToInflux (TEST_ID , TIMESTAMP , metrics , settings );
@@ -237,32 +230,25 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
237230
238231 @ Test
239232 public void testKafkaIOReadsAndWritesCorrectlyInBatch () throws IOException {
240- // Map of hashes of set size collections with 100b records - 10b key, 90b values.
241- Map <Long , String > expectedHashes =
242- ImmutableMap .of (
243- 1000L , "4507649971ee7c51abbb446e65a5c660" ,
244- 100_000_000L , "0f12c27c9a7672e14775594be66cad9a" );
245- expectedHashcode = getHashForRecordCount (sourceOptions .numRecords , expectedHashes );
246233 writePipeline
247234 .apply ("Generate records" , Read .from (new SyntheticBoundedSource (sourceOptions )))
248235 .apply ("Measure write time" , ParDo .of (new TimeMonitor <>(NAMESPACE , WRITE_TIME_METRIC_NAME )))
249236 .apply ("Write to Kafka" , writeToKafka ().withTopic (options .getKafkaTopic ()));
250237
251- PCollection <String > hashcode =
238+ PCollection <Long > count =
252239 readPipeline
253240 .apply (
254241 "Read from bounded Kafka" ,
255242 readFromBoundedKafka ().withTopic (options .getKafkaTopic ()))
256243 .apply (
257244 "Measure read time" , ParDo .of (new TimeMonitor <>(NAMESPACE , READ_TIME_METRIC_NAME )))
258- .apply ("Map records to strings" , MapElements .via (new MapKafkaRecordsToStrings ()))
259- .apply ("Calculate hashcode" , Combine .globally (new HashingFn ()).withoutDefaults ());
260-
261- PAssert .thatSingleton (hashcode ).isEqualTo (expectedHashcode );
245+ .apply ("Counting element" , Count .globally ());
262246
263247 PipelineResult writeResult = writePipeline .run ();
264248 writeResult .waitUntilFinish ();
265249
250+ PAssert .thatSingleton (count ).isEqualTo (sourceOptions .numRecords );
251+
266252 PipelineResult readResult = readPipeline .run ();
267253 PipelineResult .State readState =
268254 readResult .waitUntilFinish (Duration .standardSeconds (options .getReadTimeout ()));
@@ -271,8 +257,7 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
271257 tearDownTopic (options .getKafkaTopic ());
272258 cancelIfTimeouted (readResult , readState );
273259
274- // Fail the test if pipeline failed.
275- assertEquals (PipelineResult .State .DONE , readState );
260+ assertNotEquals (PipelineResult .State .FAILED , readState );
276261
277262 if (!options .isWithTestcontainers ()) {
278263 Set <NamedTestResult > metrics = readMetrics (writeResult , readResult );
@@ -687,9 +672,7 @@ private PipelineResult runWithStopReadingFn(
687672 readFromKafka ()
688673 .withTopic (options .getKafkaTopic () + "-" + topicSuffix )
689674 .withCheckStopReadingFn (function ))
690- .apply ("Measure read time" , ParDo .of (new TimeMonitor <>(NAMESPACE , READ_TIME_METRIC_NAME )))
691- .apply ("Map records to strings" , MapElements .via (new MapKafkaRecordsToStrings ()))
692- .apply ("Counting element" , ParDo .of (new CountingFn (NAMESPACE , READ_ELEMENT_METRIC_NAME )));
675+ .apply ("Measure read time" , ParDo .of (new TimeMonitor <>(NAMESPACE , READ_TIME_METRIC_NAME )));
693676
694677 PipelineResult writeResult = writePipeline .run ();
695678 writeResult .waitUntilFinish ();
@@ -834,19 +817,6 @@ private KafkaIO.Read<byte[], byte[]> readFromKafka() {
834817 .withConsumerConfigUpdates (ImmutableMap .of ("auto.offset.reset" , "earliest" ));
835818 }
836819
837- private static class CountingFn extends DoFn <String , Void > {
838-
839- private final Counter elementCounter ;
840-
841- CountingFn (String namespace , String name ) {
842- elementCounter = Metrics .counter (namespace , name );
843- }
844-
845- @ ProcessElement
846- public void processElement () {
847- elementCounter .inc (1L );
848- }
849- }
850820 /** Pipeline options specific for this test. */
851821 public interface Options extends IOTestPipelineOptions , StreamingOptions {
852822
@@ -887,25 +857,6 @@ public interface Options extends IOTestPipelineOptions, StreamingOptions {
887857 void setKafkaContainerVersion (String kafkaContainerVersion );
888858 }
889859
890- private static class MapKafkaRecordsToStrings
891- extends SimpleFunction <KafkaRecord <byte [], byte []>, String > {
892- @ Override
893- public String apply (KafkaRecord <byte [], byte []> input ) {
894- String key = Arrays .toString (input .getKV ().getKey ());
895- String value = Arrays .toString (input .getKV ().getValue ());
896- return String .format ("%s %s" , key , value );
897- }
898- }
899-
900- public static String getHashForRecordCount (long recordCount , Map <Long , String > hashes ) {
901- String hash = hashes .get (recordCount );
902- if (hash == null ) {
903- throw new UnsupportedOperationException (
904- String .format ("No hash for that record count: %s" , recordCount ));
905- }
906- return hash ;
907- }
908-
909860 private static void setupKafkaContainer () {
910861 kafkaContainer =
911862 new KafkaContainer (
0 commit comments