Skip to content

Commit 9903b2f

Browse files
authored
Modify batch IT to use count instead of hash (#26327)
* Modify batch IT to use count instead of hash * remove unused varaiable * run spotless, update pipeline state checking * Update timeout to 45 minutes * revert timeout, add additional counter to try and pinpoint missing records * add a log to notify ranges used when workers restart * change counts from metrics to combiners * add a window to streaming test * move the passert to the correct place * Remove extra counter, apply spotless * add additional metric to KafkaWriter * Remove debugging metrics * verify pipeline is not failed * remove extra newline
1 parent de7c50b commit 9903b2f

1 file changed

Lines changed: 19 additions & 68 deletions

File tree

  • sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertNotEquals;
2323
import static org.junit.Assert.assertNull;
24-
import static org.junit.Assert.assertTrue;
2524

2625
import com.google.cloud.Timestamp;
2726
import java.io.IOException;
28-
import java.util.Arrays;
2927
import java.util.Collection;
3028
import java.util.Collections;
3129
import java.util.HashMap;
@@ -43,13 +41,10 @@
4341
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
4442
import org.apache.beam.sdk.io.GenerateSequence;
4543
import org.apache.beam.sdk.io.Read;
46-
import org.apache.beam.sdk.io.common.HashingFn;
4744
import org.apache.beam.sdk.io.common.IOITHelper;
4845
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
4946
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
5047
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
51-
import org.apache.beam.sdk.metrics.Counter;
52-
import org.apache.beam.sdk.metrics.Metrics;
5348
import org.apache.beam.sdk.options.Default;
5449
import org.apache.beam.sdk.options.Description;
5550
import org.apache.beam.sdk.options.ExperimentalOptions;
@@ -68,6 +63,7 @@
6863
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
6964
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
7065
import org.apache.beam.sdk.transforms.Combine;
66+
import org.apache.beam.sdk.transforms.Count;
7167
import org.apache.beam.sdk.transforms.Create;
7268
import org.apache.beam.sdk.transforms.DoFn;
7369
import org.apache.beam.sdk.transforms.Flatten;
@@ -76,8 +72,8 @@
7672
import org.apache.beam.sdk.transforms.MapElements;
7773
import org.apache.beam.sdk.transforms.ParDo;
7874
import org.apache.beam.sdk.transforms.SerializableFunction;
79-
import org.apache.beam.sdk.transforms.SimpleFunction;
8075
import org.apache.beam.sdk.transforms.Values;
76+
import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
8177
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
8278
import org.apache.beam.sdk.transforms.windowing.Window;
8379
import org.apache.beam.sdk.values.KV;
@@ -139,8 +135,6 @@ public class KafkaIOIT {
139135

140136
private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class);
141137

142-
private static String expectedHashcode;
143-
144138
private static SyntheticSourceOptions sourceOptions;
145139

146140
private static Options options;
@@ -202,17 +196,23 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
202196

203197
// Use streaming pipeline to read Kafka records.
204198
readPipeline.getOptions().as(Options.class).setStreaming(true);
205-
readPipeline
206-
.apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic()))
207-
.apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
208-
.apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
209-
.apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
199+
PCollection<Long> count =
200+
readPipeline
201+
.apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic()))
202+
.apply(
203+
"Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
204+
.apply("Window", Window.into(CalendarWindows.years(1)))
205+
.apply(
206+
"Counting element",
207+
Combine.globally(Count.<KafkaRecord<byte[], byte[]>>combineFn()).withoutDefaults());
210208

211209
PipelineResult writeResult = writePipeline.run();
212210
PipelineResult.State writeState = writeResult.waitUntilFinish();
213211
// Fail the test if pipeline failed.
214212
assertNotEquals(PipelineResult.State.FAILED, writeState);
215213

214+
PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords);
215+
216216
PipelineResult readResult = readPipeline.run();
217217
PipelineResult.State readState =
218218
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
@@ -221,13 +221,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
221221
tearDownTopic(options.getKafkaTopic());
222222
cancelIfTimeouted(readResult, readState);
223223

224-
long actualRecords = readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME);
225-
assertTrue(
226-
String.format(
227-
"actual number of records %d smaller than expected: %d.",
228-
actualRecords, sourceOptions.numRecords),
229-
sourceOptions.numRecords <= actualRecords);
230-
231224
if (!options.isWithTestcontainers()) {
232225
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
233226
IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
@@ -237,32 +230,25 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
237230

238231
@Test
239232
public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
240-
// Map of hashes of set size collections with 100b records - 10b key, 90b values.
241-
Map<Long, String> expectedHashes =
242-
ImmutableMap.of(
243-
1000L, "4507649971ee7c51abbb446e65a5c660",
244-
100_000_000L, "0f12c27c9a7672e14775594be66cad9a");
245-
expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, expectedHashes);
246233
writePipeline
247234
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
248235
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
249236
.apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
250237

251-
PCollection<String> hashcode =
238+
PCollection<Long> count =
252239
readPipeline
253240
.apply(
254241
"Read from bounded Kafka",
255242
readFromBoundedKafka().withTopic(options.getKafkaTopic()))
256243
.apply(
257244
"Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
258-
.apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
259-
.apply("Calculate hashcode", Combine.globally(new HashingFn()).withoutDefaults());
260-
261-
PAssert.thatSingleton(hashcode).isEqualTo(expectedHashcode);
245+
.apply("Counting element", Count.globally());
262246

263247
PipelineResult writeResult = writePipeline.run();
264248
writeResult.waitUntilFinish();
265249

250+
PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords);
251+
266252
PipelineResult readResult = readPipeline.run();
267253
PipelineResult.State readState =
268254
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
@@ -271,8 +257,7 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
271257
tearDownTopic(options.getKafkaTopic());
272258
cancelIfTimeouted(readResult, readState);
273259

274-
// Fail the test if pipeline failed.
275-
assertEquals(PipelineResult.State.DONE, readState);
260+
assertNotEquals(PipelineResult.State.FAILED, readState);
276261

277262
if (!options.isWithTestcontainers()) {
278263
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
@@ -687,9 +672,7 @@ private PipelineResult runWithStopReadingFn(
687672
readFromKafka()
688673
.withTopic(options.getKafkaTopic() + "-" + topicSuffix)
689674
.withCheckStopReadingFn(function))
690-
.apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
691-
.apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
692-
.apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
675+
.apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)));
693676

694677
PipelineResult writeResult = writePipeline.run();
695678
writeResult.waitUntilFinish();
@@ -834,19 +817,6 @@ private KafkaIO.Read<byte[], byte[]> readFromKafka() {
834817
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"));
835818
}
836819

837-
private static class CountingFn extends DoFn<String, Void> {
838-
839-
private final Counter elementCounter;
840-
841-
CountingFn(String namespace, String name) {
842-
elementCounter = Metrics.counter(namespace, name);
843-
}
844-
845-
@ProcessElement
846-
public void processElement() {
847-
elementCounter.inc(1L);
848-
}
849-
}
850820
/** Pipeline options specific for this test. */
851821
public interface Options extends IOTestPipelineOptions, StreamingOptions {
852822

@@ -887,25 +857,6 @@ public interface Options extends IOTestPipelineOptions, StreamingOptions {
887857
void setKafkaContainerVersion(String kafkaContainerVersion);
888858
}
889859

890-
private static class MapKafkaRecordsToStrings
891-
extends SimpleFunction<KafkaRecord<byte[], byte[]>, String> {
892-
@Override
893-
public String apply(KafkaRecord<byte[], byte[]> input) {
894-
String key = Arrays.toString(input.getKV().getKey());
895-
String value = Arrays.toString(input.getKV().getValue());
896-
return String.format("%s %s", key, value);
897-
}
898-
}
899-
900-
public static String getHashForRecordCount(long recordCount, Map<Long, String> hashes) {
901-
String hash = hashes.get(recordCount);
902-
if (hash == null) {
903-
throw new UnsupportedOperationException(
904-
String.format("No hash for that record count: %s", recordCount));
905-
}
906-
return hash;
907-
}
908-
909860
private static void setupKafkaContainer() {
910861
kafkaContainer =
911862
new KafkaContainer(

0 commit comments

Comments
 (0)