Skip to content

Commit 37148d6

Browse files
committed
Merge pull request #2 from isnotinvain/PR-98
Updates to PR-98
2 parents 1df4a71 + b9abab0 commit 37148d6

File tree

18 files changed

+242
-132
lines changed

18 files changed

+242
-132
lines changed

parquet-column/src/main/java/parquet/column/ParquetProperties.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,20 @@ public boolean isEnableDictionary() {
202202

203203
public ColumnWriteStore newColumnWriteStore(
204204
MessageType schema,
205-
PageWriteStore pageStore, int pageSize,
206-
int initialPageBufferSize) {
205+
PageWriteStore pageStore,
206+
int pageSize) {
207207
switch (writerVersion) {
208208
case PARQUET_1_0:
209209
return new ColumnWriteStoreV1(
210210
pageStore,
211-
pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
211+
pageSize,
212+
dictionaryPageSizeThreshold,
212213
enableDictionary, writerVersion);
213214
case PARQUET_2_0:
214215
return new ColumnWriteStoreV2(
215216
schema,
216217
pageStore,
217-
pageSize, initialPageBufferSize,
218+
pageSize,
218219
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
219220
default:
220221
throw new IllegalArgumentException("unknown version " + writerVersion);

parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
3636
private final int pageSizeThreshold;
3737
private final int dictionaryPageSizeThreshold;
3838
private final boolean enableDictionary;
39-
private final int initialSizePerCol;
4039
private final WriterVersion writerVersion;
4140

42-
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
41+
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
4342
super();
4443
this.pageWriteStore = pageWriteStore;
4544
this.pageSizeThreshold = pageSizeThreshold;
46-
this.initialSizePerCol = initialSizePerCol;
4745
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
4846
this.enableDictionary = enableDictionary;
4947
this.writerVersion = writerVersion;
@@ -64,7 +62,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {
6462

6563
private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
6664
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
67-
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
65+
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
6866
}
6967

7068
@Override

parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
5353
public ColumnWriteStoreV2(
5454
MessageType schema,
5555
PageWriteStore pageWriteStore,
56-
int pageSizeThreshold, int initialSizePerCol,
56+
int pageSizeThreshold,
5757
ParquetProperties parquetProps) {
5858
super();
5959
this.pageSizeThreshold = pageSizeThreshold;
6060
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
6161
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
6262
for (ColumnDescriptor path : schema.getColumns()) {
6363
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
64-
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold));
64+
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
6565
}
6666
this.columns = unmodifiableMap(mcolumns);
6767
this.writers = this.columns.values();

parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121

2222
import parquet.Log;
23+
import parquet.bytes.CapacityByteArrayOutputStream;
2324
import parquet.column.ColumnDescriptor;
2425
import parquet.column.ColumnWriter;
2526
import parquet.column.ParquetProperties;
@@ -31,6 +32,9 @@
3132
import parquet.io.ParquetEncodingException;
3233
import parquet.io.api.Binary;
3334

35+
import static java.lang.Math.max;
36+
import static java.lang.Math.pow;
37+
3438
/**
3539
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
3640
*
@@ -41,6 +45,7 @@ final class ColumnWriterV1 implements ColumnWriter {
4145
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
4246
private static final boolean DEBUG = Log.DEBUG;
4347
private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
48+
private static final int MIN_SLAB_SIZE = 64;
4449

4550
private final ColumnDescriptor path;
4651
private final PageWriter pageWriter;
@@ -57,7 +62,6 @@ public ColumnWriterV1(
5762
ColumnDescriptor path,
5863
PageWriter pageWriter,
5964
int pageSizeThreshold,
60-
int initialSizePerCol,
6165
int dictionaryPageSizeThreshold,
6266
boolean enableDictionary,
6367
WriterVersion writerVersion) {
@@ -69,9 +73,12 @@ public ColumnWriterV1(
6973
resetStatistics();
7074

7175
ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
72-
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold);
73-
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold);
74-
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold);
76+
77+
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
78+
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
79+
80+
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
81+
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
7582
}
7683

7784
private void log(Object value, int r, int d) {

parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
*/
1616
package parquet.column.impl;
1717

18+
import static java.lang.Math.max;
19+
import static java.lang.Math.pow;
1820
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
1921

2022
import java.io.IOException;
2123

2224
import parquet.Ints;
2325
import parquet.Log;
2426
import parquet.bytes.BytesInput;
27+
import parquet.bytes.CapacityByteArrayOutputStream;
2528
import parquet.column.ColumnDescriptor;
2629
import parquet.column.ColumnWriter;
2730
import parquet.column.Encoding;
@@ -43,6 +46,7 @@
4346
final class ColumnWriterV2 implements ColumnWriter {
4447
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
4548
private static final boolean DEBUG = Log.DEBUG;
49+
private static final int MIN_SLAB_SIZE = 64;
4650

4751
private final ColumnDescriptor path;
4852
private final PageWriter pageWriter;
@@ -57,15 +61,17 @@ final class ColumnWriterV2 implements ColumnWriter {
5761
public ColumnWriterV2(
5862
ColumnDescriptor path,
5963
PageWriter pageWriter,
60-
int initialSizePerCol,
6164
ParquetProperties parquetProps,
6265
int pageSize) {
6366
this.path = path;
6467
this.pageWriter = pageWriter;
6568
resetStatistics();
66-
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize);
67-
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize);
68-
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize);
69+
70+
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
71+
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
72+
73+
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
74+
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
6975
}
7076

7177
private void log(Object value, int r, int d) {

parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import parquet.Log;
4040
import parquet.bytes.BytesInput;
4141
import parquet.bytes.BytesUtils;
42+
import parquet.bytes.CapacityByteArrayOutputStream;
4243
import parquet.column.Encoding;
4344
import parquet.column.page.DictionaryPage;
4445
import parquet.column.values.RequiresFallback;
@@ -62,6 +63,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
6263

6364
/* max entries allowed for the dictionary will fail over to plain encoding if reached */
6465
private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
66+
private static final int MIN_INITIAL_SLAB_SIZE = 64;
6567

6668
/* encoding to label the data page */
6769
private final Encoding encodingForDataPage;
@@ -142,8 +144,12 @@ public BytesInput getBytes() {
142144
int maxDicId = getDictionarySize() - 1;
143145
if (DEBUG) LOG.debug("max dic id " + maxDicId);
144146
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
145-
// TODO: what is a good initialCapacity?
146-
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize);
147+
148+
int initialSlabSize =
149+
CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);
150+
151+
RunLengthBitPackingHybridEncoder encoder =
152+
new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
147153
IntIterator iterator = encodedValues.iterator();
148154
try {
149155
while (iterator.hasNext()) {

parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void test() {
3838
MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
3939
ColumnDescriptor col = schema.getColumns().get(0);
4040
MemPageWriter pageWriter = new MemPageWriter();
41-
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
41+
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
4242
for (int i = 0; i < rows; i++) {
4343
columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
4444
if ((i + 1) % 1000 == 0) {
@@ -73,7 +73,7 @@ public void testOptional() {
7373
MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
7474
ColumnDescriptor col = schema.getColumns().get(0);
7575
MemPageWriter pageWriter = new MemPageWriter();
76-
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
76+
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
7777
for (int i = 0; i < rows; i++) {
7878
columnWriterV2.writeNull(0, 0);
7979
if ((i + 1) % 1000 == 0) {

parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
156156
}
157157

158158
private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
159-
return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
159+
return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0);
160160
}
161161
}

parquet-column/src/test/java/parquet/io/PerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private static void read(MemPageStore memPageStore, MessageType myschema,
7474

7575

7676
private static void write(MemPageStore memPageStore) {
77-
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
77+
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
7878
MessageColumnIO columnIO = newColumnFactory(schema);
7979

8080
GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);

parquet-column/src/test/java/parquet/io/TestColumnIO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ public void testPushParser() {
514514
}
515515

516516
private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
517-
return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
517+
return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
518518
}
519519

520520
@Test

0 commit comments

Comments
 (0)