Skip to content

Commit 61c0100

Browse files
committed
Make initial slab size heuristic into a helper method, apply in DictionaryValuesWriter as well
1 parent a257ee4 commit 61c0100

4 files changed

Lines changed: 46 additions & 10 deletions

File tree

parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121

2222
import parquet.Log;
23+
import parquet.bytes.CapacityByteArrayOutputStream;
2324
import parquet.column.ColumnDescriptor;
2425
import parquet.column.ColumnWriter;
2526
import parquet.column.ParquetProperties;
@@ -76,10 +77,7 @@ public ColumnWriterV1(
7677
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
7778
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
7879

79-
// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
80-
// eg for page size of 1MB we start at 1024 bytes.
81-
// we also don't want to start too small, so we also apply a minimum.
82-
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10))));
80+
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
8381
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
8482
}
8583

parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import parquet.Ints;
2525
import parquet.Log;
2626
import parquet.bytes.BytesInput;
27+
import parquet.bytes.CapacityByteArrayOutputStream;
2728
import parquet.column.ColumnDescriptor;
2829
import parquet.column.ColumnWriter;
2930
import parquet.column.Encoding;
@@ -69,10 +70,7 @@ public ColumnWriterV2(
6970
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
7071
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
7172

72-
// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
73-
// eg for page size of 1MB we start at 1024 bytes.
74-
// we also don't want to start too small, so we also apply a minimum.
75-
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10))));
73+
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
7674
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
7775
}
7876

parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import parquet.Log;
4040
import parquet.bytes.BytesInput;
4141
import parquet.bytes.BytesUtils;
42+
import parquet.bytes.CapacityByteArrayOutputStream;
4243
import parquet.column.Encoding;
4344
import parquet.column.page.DictionaryPage;
4445
import parquet.column.values.RequiresFallback;
@@ -62,6 +63,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
6263

6364
/* max entries allowed for the dictionary will fail over to plain encoding if reached */
6465
private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
66+
private static final int MIN_INITIAL_SLAB_SIZE = 64;
6567

6668
/* encoding to label the data page */
6769
private final Encoding encodingForDataPage;
@@ -142,8 +144,12 @@ public BytesInput getBytes() {
142144
int maxDicId = getDictionarySize() - 1;
143145
if (DEBUG) LOG.debug("max dic id " + maxDicId);
144146
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
145-
// TODO: what is a good initialCapacity?
146-
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize);
147+
148+
int initialSlabSize =
149+
CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);
150+
151+
RunLengthBitPackingHybridEncoder encoder =
152+
new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
147153
IntIterator iterator = encodedValues.iterator();
148154
try {
149155
while (iterator.hasNext()) {

parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package parquet.bytes;
1717

1818
import static java.lang.Math.max;
19+
import static java.lang.Math.pow;
1920
import static java.lang.String.format;
2021
import static java.lang.System.arraycopy;
2122
import static parquet.Preconditions.checkArgument;
@@ -61,6 +62,39 @@ public class CapacityByteArrayOutputStream extends OutputStream {
6162
private int bytesAllocated = 0;
6263
private int bytesUsed = 0;
6364

65+
/**
66+
* Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
67+
* will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be
68+
* a balance between the overhead of creating new slabs and wasting memory by eagerly making
69+
* initial slabs too big.
70+
*
71+
* Note that targetCapacity here need not match maxCapacityHint in the constructor of
72+
* CapacityByteArrayOutputStream, though often that would make sense.
73+
*
74+
* @param minSlabSize no matter what we shouldn't make slabs any smaller than this
75+
* @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have?
76+
* @param targetNumSlabs how many slabs should it take to reach targetTotalSize?
77+
*/
78+
public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) {
79+
// initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times
80+
// before reaching the targetCapacity
81+
// eg for page size of 1MB we start at 1024 bytes.
82+
// we also don't want to start too small, so we also apply a minimum.
83+
return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
84+
}
85+
86+
/**
87+
* Construct a CapacityByteArrayOutputStream configured such that it's initial slab size is
88+
* determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
89+
*/
90+
public static CapacityByteArrayOutputStream withTargetNumSlabs(
91+
int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
92+
93+
return new CapacityByteArrayOutputStream(
94+
initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
95+
maxCapacityHint);
96+
}
97+
6498
/**
6599
* Defaults maxCapacityHint to 1MB
66100
* @param initialSlabSize

0 commit comments

Comments
 (0)