|
28 | 28 |
|
29 | 29 | import parquet.Log; |
30 | 30 | import parquet.bytes.BytesInput; |
31 | | -import parquet.bytes.CapacityByteArrayOutputStream; |
32 | 31 | import parquet.bytes.ConcatenatingByteArrayCollector; |
33 | 32 | import parquet.column.ColumnDescriptor; |
34 | 33 | import parquet.column.Encoding; |
@@ -102,14 +101,14 @@ public void writePage(BytesInput bytes, |
102 | 101 | dlEncoding, |
103 | 102 | valuesEncoding, |
104 | 103 | tempOutputStream); |
105 | | - buf.collect(tempOutputStream.toByteArray()); |
106 | | - tempOutputStream.reset(); |
107 | 104 | this.uncompressedLength += uncompressedSize; |
108 | 105 | this.compressedLength += compressedSize; |
109 | 106 | this.totalValueCount += valueCount; |
110 | 107 | this.pageCount += 1; |
111 | 108 | this.totalStatistics.mergeStatistics(statistics); |
112 | | - buf.collect(compressedBytes); |
| 109 | + // by concatenating before collecting instead of collecting twice, |
| 110 | + // we only allocate one buffer to copy into instead of multiple. |
| 111 | + buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes)); |
113 | 112 | encodings.add(rlEncoding); |
114 | 113 | encodings.add(dlEncoding); |
115 | 114 | encodings.add(valuesEncoding); |
@@ -140,16 +139,21 @@ public void writePageV2( |
140 | 139 | rlByteLength, |
141 | 140 | dlByteLength, |
142 | 141 | tempOutputStream); |
143 | | - buf.collect(tempOutputStream.toByteArray()); |
144 | | - tempOutputStream.reset(); |
145 | 142 | this.uncompressedLength += uncompressedSize; |
146 | 143 | this.compressedLength += compressedSize; |
147 | 144 | this.totalValueCount += valueCount; |
148 | 145 | this.pageCount += 1; |
149 | 146 | this.totalStatistics.mergeStatistics(statistics); |
150 | | - buf.collect(repetitionLevels); |
151 | | - buf.collect(definitionLevels); |
152 | | - buf.collect(compressedData); |
| 147 | + |
| 148 | + // by concatenating before collecting instead of collecting twice, |
| 149 | + // we only allocate one buffer to copy into instead of multiple. |
| 150 | + buf.collect( |
| 151 | + BytesInput.concat( |
| 152 | + BytesInput.from(tempOutputStream), |
| 153 | + repetitionLevels, |
| 154 | + definitionLevels, |
| 155 | + compressedData) |
| 156 | + ); |
153 | 157 | encodings.add(dataEncoding); |
154 | 158 | } |
155 | 159 |
|
|
0 commit comments