-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Bug]: IndexOutOfBoundsException in TextIO.Read with non-default delimiter #32249
Description
What happened?
The pipeline reading a text file with a non-default delimit [1] fails by IndexOutOfBoundsException at TextBasedReader.readCustomLine [2].
The delimiter is "ABCDE" (5 bytes).
The input file is sample.csv. It is 16400 bytes and has 'A' at index 8190, 'B' at index 8191 (index is 0-based), and 'C' at index 8192. So, the pipeline doesn't split the file content and the whole content should be a single element.
I have a theory about the root cause as below.
The code TextBasedReader.readCustomLine writes buffer (8192 bytes) into a ByteArrayOutputStream, but the range is [0, 8194) when the exception is thrown. This is because the appendLength is 8194, where readLength is 8192 (= the length of buffer), delPosn is 0, prevDelPosn is 2.
For the first buffer read of [0, 8192), the delPosn is 2 as the buffer finishes with "AB". For the second buffer read of [8192, 16384), the delPosn is reset to 0 (no delimit character matched) while prevDelPosn is 2 (= delPosn in prev buffer read). I guess this is a bug not to reset prevDelPosn to 0 when delimiter match fails.
[1]
public class TextReadJob {
private static final String INPUT_PATH = "sample.csv";
private static final byte[] DELIMITER = "ABCDE".getBytes(StandardCharsets.UTF_8);
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER));
pipeline.run();
}
}
[2]
Aug 19, 2024 9:24:32 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern sample.csv matched 1 files with total size 16400
Aug 19, 2024 9:24:32 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern sample.csv into bundles of size 1025 took 1 ms and produced 1 files and 16 bundles
Exception in thread "main" java.lang.IndexOutOfBoundsException: Range [0, 0 + 8194) out of bounds for length 8192
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckFromIndexSize(Preconditions.java:82)
at java.base/jdk.internal.util.Preconditions.checkFromIndexSize(Preconditions.java:343)
at java.base/java.util.Objects.checkFromIndexSize(Objects.java:426)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:155)
at org.apache.beam.sdk.io.TextSource$TextBasedReader.readCustomLine(TextSource.java:466)
at org.apache.beam.sdk.io.TextSource$TextBasedReader.readNextRecord(TextSource.java:268)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:507)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:502)
at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:150)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner