GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k#3041
GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k#3041Fokko merged 4 commits intoapache:masterfrom
Conversation
…ult when dict size exceeds 8k
| super(buf); | ||
| } | ||
|
|
||
| // In practice, there are some implementations always return 0 even if they has more data |
There was a problem hiding this comment.
in my case, the underlying IntputStream is H1SeekableInputStream
There was a problem hiding this comment.
Out of curiosity. Why are you using H1SeekableInputStream? This one is related to Hadoop 1.
There was a problem hiding this comment.
My test code is
(sorry, the file contains private data so I can not share)
public class MyDictionaryFilterTest {
private static final Configuration conf = new Configuration();
List<ColumnChunkMetaData> ccmd;
ParquetFileReader reader;
DictionaryPageReadStore dictionaries;
private Path file = new Path("/Users/chengpan/Temp/part-2bb8404a-f6e5-4e9f-9161-f749c4bf46d0-2-2222");
@Before
public void setUp() throws Exception {
reader = ParquetFileReader.open(conf, file);
ParquetMetadata meta = reader.getFooter();
ccmd = meta.getBlocks().get(0).getColumns();
dictionaries = reader.getDictionaryReader(meta.getBlocks().get(0));
}
@After
public void tearDown() throws Exception {
reader.close();
}
@Test
public void testEqBinary() throws Exception {
BinaryColumn b = binaryColumn("source_id");
FilterPredicate pred = eq(b, Binary.fromString("5059661515"));
assertFalse(canDrop(pred, ccmd, dictionaries));
}
}
There was a problem hiding this comment.
Thanks, this is very helpful!
| byte[] input = new byte[data.length + 10]; | ||
| RANDOM.nextBytes(input); | ||
| System.arraycopy(data, 0, input, 0, data.length); | ||
| Supplier<BytesInput> factory = () -> BytesInput.from(new AvailableAgnosticInputStream(input), 9 * 1024); |
There was a problem hiding this comment.
What about using an anonymous class here instead of adding a new file?
There was a problem hiding this comment.
I tend to use a new file as it is a quite common case that needs to be tested, it might be used in other places in the future.
| ReadableByteChannel channel = Channels.newChannel(in); | ||
| int remaining = byteCount; | ||
| while (remaining > 0) { | ||
| remaining -= channel.read(workBuf); |
There was a problem hiding this comment.
Is remaining reliable? Should we check the return value of channel.read(workBuf)?
There was a problem hiding this comment.
added a check to detect EOF case
| ByteBuffer workBuf = buffer.duplicate(); | ||
| int pos = buffer.position(); | ||
| workBuf.limit(pos + byteCount); | ||
| Channels.newChannel(in).read(workBuf); |
There was a problem hiding this comment.
Is there any other place that is used like this?
There was a problem hiding this comment.
I went through the original PR and found nothing else, it would be great if others have a double check.
|
Thanks @pan3793 for finding and fixing this, and thanks @wgtmac @ConeyLiu and @gszadovszky for the review 🙌 |
Rationale for this change
Fixes the data loss issue that reported in #3040
What changes are included in this PR?
Ensure that
StreamBytesInput#writeInto(ByteBuffer buffer)copies data fully, even if the underlyingInputStreamdoes not report available correctly.Are these changes tested?
UTs are added, I also tested it with an internal production data loss case.
Are there any user-facing changes?
Yes, this fixes some data loss cases, and I acknowledge that the bug affects Spark 4.0.0 preview2 which ships Parquet 1.14.2.
Closes #3040