Skip to content

Commit b9abab0

Browse files
committed
Address Julien's comment
1 parent 965af7f commit b9abab0

2 files changed

Lines changed: 15 additions & 14 deletions

File tree

parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@ public class ConcatenatingByteArrayCollector extends BytesInput {
1111
private final List<byte[]> slabs = new ArrayList<byte[]>();
1212
private long size = 0;
1313

14-
public void collect(BytesInput bytes) throws IOException {
15-
collect(bytes.toByteArray());
16-
}
17-
18-
public void collect(byte[] bytes) {
14+
public void collect(BytesInput bytesInput) throws IOException {
15+
byte[] bytes = bytesInput.toByteArray();
1916
slabs.add(bytes);
2017
size += bytes.length;
2118
}

parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import parquet.Log;
3030
import parquet.bytes.BytesInput;
31-
import parquet.bytes.CapacityByteArrayOutputStream;
3231
import parquet.bytes.ConcatenatingByteArrayCollector;
3332
import parquet.column.ColumnDescriptor;
3433
import parquet.column.Encoding;
@@ -102,14 +101,14 @@ public void writePage(BytesInput bytes,
102101
dlEncoding,
103102
valuesEncoding,
104103
tempOutputStream);
105-
buf.collect(tempOutputStream.toByteArray());
106-
tempOutputStream.reset();
107104
this.uncompressedLength += uncompressedSize;
108105
this.compressedLength += compressedSize;
109106
this.totalValueCount += valueCount;
110107
this.pageCount += 1;
111108
this.totalStatistics.mergeStatistics(statistics);
112-
buf.collect(compressedBytes);
109+
// by concatenating before collecting instead of collecting twice,
110+
// we only allocate one buffer to copy into instead of multiple.
111+
buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
113112
encodings.add(rlEncoding);
114113
encodings.add(dlEncoding);
115114
encodings.add(valuesEncoding);
@@ -140,16 +139,21 @@ public void writePageV2(
140139
rlByteLength,
141140
dlByteLength,
142141
tempOutputStream);
143-
buf.collect(tempOutputStream.toByteArray());
144-
tempOutputStream.reset();
145142
this.uncompressedLength += uncompressedSize;
146143
this.compressedLength += compressedSize;
147144
this.totalValueCount += valueCount;
148145
this.pageCount += 1;
149146
this.totalStatistics.mergeStatistics(statistics);
150-
buf.collect(repetitionLevels);
151-
buf.collect(definitionLevels);
152-
buf.collect(compressedData);
147+
148+
// by concatenating before collecting instead of collecting twice,
149+
// we only allocate one buffer to copy into instead of multiple.
150+
buf.collect(
151+
BytesInput.concat(
152+
BytesInput.from(tempOutputStream),
153+
repetitionLevels,
154+
definitionLevels,
155+
compressedData)
156+
);
153157
encodings.add(dataEncoding);
154158
}
155159

0 commit comments

Comments
 (0)