Skip to content

Commit 6a20e8b

Browse files
committed
Remove initialSlabSize decision from InternalParquetRecordReader, use a simpler heuristic in the column writers instead
1 parent 3a0f8e4 commit 6a20e8b

6 files changed

Lines changed: 35 additions & 31 deletions

File tree

parquet-column/src/main/java/parquet/column/ParquetProperties.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,20 @@ public boolean isEnableDictionary() {
202202

203203
public ColumnWriteStore newColumnWriteStore(
204204
MessageType schema,
205-
PageWriteStore pageStore, int pageSize,
206-
int initialPageBufferSize) {
205+
PageWriteStore pageStore,
206+
int pageSize) {
207207
switch (writerVersion) {
208208
case PARQUET_1_0:
209209
return new ColumnWriteStoreV1(
210210
pageStore,
211-
pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
211+
pageSize,
212+
dictionaryPageSizeThreshold,
212213
enableDictionary, writerVersion);
213214
case PARQUET_2_0:
214215
return new ColumnWriteStoreV2(
215216
schema,
216217
pageStore,
217-
pageSize, initialPageBufferSize,
218+
pageSize,
218219
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
219220
default:
220221
throw new IllegalArgumentException("unknown version " + writerVersion);

parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
3636
private final int pageSizeThreshold;
3737
private final int dictionaryPageSizeThreshold;
3838
private final boolean enableDictionary;
39-
private final int initialSizePerCol;
4039
private final WriterVersion writerVersion;
4140

42-
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
41+
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
4342
super();
4443
this.pageWriteStore = pageWriteStore;
4544
this.pageSizeThreshold = pageSizeThreshold;
46-
this.initialSizePerCol = initialSizePerCol;
4745
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
4846
this.enableDictionary = enableDictionary;
4947
this.writerVersion = writerVersion;
@@ -64,7 +62,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {
6462

6563
private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
6664
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
67-
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
65+
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
6866
}
6967

7068
@Override

parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
5353
public ColumnWriteStoreV2(
5454
MessageType schema,
5555
PageWriteStore pageWriteStore,
56-
int pageSizeThreshold, int initialSizePerCol,
56+
int pageSizeThreshold,
5757
ParquetProperties parquetProps) {
5858
super();
5959
this.pageSizeThreshold = pageSizeThreshold;
6060
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
6161
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
6262
for (ColumnDescriptor path : schema.getColumns()) {
6363
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
64-
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold));
64+
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
6565
}
6666
this.columns = unmodifiableMap(mcolumns);
6767
this.writers = this.columns.values();

parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import parquet.io.ParquetEncodingException;
3232
import parquet.io.api.Binary;
3333

34+
import static java.lang.Math.max;
35+
import static java.lang.Math.pow;
36+
3437
/**
3538
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
3639
*
@@ -41,6 +44,7 @@ final class ColumnWriterV1 implements ColumnWriter {
4144
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
4245
private static final boolean DEBUG = Log.DEBUG;
4346
private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
47+
private static final int MIN_SLAB_SIZE = 64;
4448

4549
private final ColumnDescriptor path;
4650
private final PageWriter pageWriter;
@@ -57,7 +61,6 @@ public ColumnWriterV1(
5761
ColumnDescriptor path,
5862
PageWriter pageWriter,
5963
int pageSizeThreshold,
60-
int initialSizePerCol,
6164
int dictionaryPageSizeThreshold,
6265
boolean enableDictionary,
6366
WriterVersion writerVersion) {
@@ -69,9 +72,15 @@ public ColumnWriterV1(
6972
resetStatistics();
7073

7174
ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
72-
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold);
73-
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold);
74-
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold);
75+
76+
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
77+
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
78+
79+
// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
80+
// eg for page size of 1MB we start at 1024 bytes.
81+
// we also don't want to start too small, so we also apply a minimum.
82+
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10))));
83+
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
7584
}
7685

7786
private void log(Object value, int r, int d) {

parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package parquet.column.impl;
1717

18+
import static java.lang.Math.max;
19+
import static java.lang.Math.pow;
1820
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
1921

2022
import java.io.IOException;
@@ -43,6 +45,7 @@
4345
final class ColumnWriterV2 implements ColumnWriter {
4446
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
4547
private static final boolean DEBUG = Log.DEBUG;
48+
private static final int MIN_SLAB_SIZE = 64;
4649

4750
private final ColumnDescriptor path;
4851
private final PageWriter pageWriter;
@@ -57,15 +60,20 @@ final class ColumnWriterV2 implements ColumnWriter {
5760
public ColumnWriterV2(
5861
ColumnDescriptor path,
5962
PageWriter pageWriter,
60-
int initialSizePerCol,
6163
ParquetProperties parquetProps,
6264
int pageSize) {
6365
this.path = path;
6466
this.pageWriter = pageWriter;
6567
resetStatistics();
66-
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize);
67-
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize);
68-
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize);
68+
69+
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
70+
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
71+
72+
// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
73+
// eg for page size of 1MB we start at 1024 bytes.
74+
// we also don't want to start too small, so we also apply a minimum.
75+
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10))));
76+
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
6977
}
7078

7179
private void log(Object value, int r, int d) {

parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
class InternalParquetRecordWriter<T> {
4343
private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
4444

45-
private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
4645
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
4746
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
4847

@@ -98,22 +97,11 @@ public InternalParquetRecordWriter(
9897
}
9998

10099
private void initStore() {
101-
// we don't want this number to be too small
102-
// ideally we divide the block equally across the columns
103-
// it is unlikely all columns are going to be the same size.
104-
// its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type.
105-
// therefore this size is cast to int, since allocating byte array in under layer needs to
106-
// limit the array size in an int scope.
107-
int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5));
108100
pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
109-
// we don't want this number to be too small either
110-
// ideally, slightly bigger than the page size, but not bigger than the block buffer
111-
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
112101
columnStore = parquetProperties.newColumnWriteStore(
113102
schema,
114103
pageStore,
115-
pageSize,
116-
initialPageBufferSize);
104+
pageSize);
117105
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
118106
writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
119107
}

0 commit comments

Comments
 (0)