Skip to content

Commit e2c4199

Browse files
authored
Support Predicate Pushdown for Parquet Lists (#2108) (#2999)
* Add buffer to ColumnLevelDecoderImpl (#2108) * Implement skip_rep_levels * Add integration test * Clippy
1 parent fc58036 commit e2c4199

File tree

3 files changed

+218
-32
lines changed

3 files changed

+218
-32
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2421,4 +2421,32 @@ mod tests {
24212421
let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
24222422
assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
24232423
}
2424+
2425+
#[test]
2426+
#[cfg(feature = "snap")]
2427+
fn test_read_nested_lists() {
2428+
let testdata = arrow::util::test_util::parquet_test_data();
2429+
let path = format!("{}/nested_lists.snappy.parquet", testdata);
2430+
let file = File::open(&path).unwrap();
2431+
2432+
let f = file.try_clone().unwrap();
2433+
let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
2434+
let expected = reader.next().unwrap().unwrap();
2435+
assert_eq!(expected.num_rows(), 3);
2436+
2437+
let selection = RowSelection::from(vec![
2438+
RowSelector::skip(1),
2439+
RowSelector::select(1),
2440+
RowSelector::skip(1),
2441+
]);
2442+
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
2443+
.unwrap()
2444+
.with_row_selection(selection)
2445+
.build()
2446+
.unwrap();
2447+
2448+
let actual = reader.next().unwrap().unwrap();
2449+
assert_eq!(actual.num_rows(), 1);
2450+
assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
2451+
}
24242452
}

parquet/src/column/reader/decoder.rs

Lines changed: 189 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,13 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
264264
}
265265
}
266266

267+
const SKIP_BUFFER_SIZE: usize = 1024;
268+
267269
/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
268270
pub struct ColumnLevelDecoderImpl {
269271
decoder: Option<LevelDecoderInner>,
272+
/// Temporary buffer populated when skipping values
273+
buffer: Vec<i16>,
270274
bit_width: u8,
271275
}
272276

@@ -275,9 +279,36 @@ impl ColumnLevelDecoderImpl {
275279
let bit_width = num_required_bits(max_level as u64);
276280
Self {
277281
decoder: None,
282+
buffer: vec![],
278283
bit_width,
279284
}
280285
}
286+
287+
/// Drops the first `len` values from the internal buffer
288+
fn split_off_buffer(&mut self, len: usize) {
289+
match self.buffer.len() == len {
290+
true => self.buffer.clear(),
291+
false => {
292+
// Move to_read elements to end of slice
293+
self.buffer.rotate_left(len);
294+
// Truncate buffer
295+
self.buffer.truncate(self.buffer.len() - len);
296+
}
297+
}
298+
}
299+
300+
/// Reads up to `to_read` values to the internal buffer
301+
fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
302+
let mut buf = std::mem::take(&mut self.buffer);
303+
304+
// Repopulate buffer
305+
buf.resize(to_read, 0);
306+
let actual = self.read(&mut buf, 0..to_read)?;
307+
buf.truncate(actual);
308+
309+
self.buffer = buf;
310+
Ok(())
311+
}
281312
}
282313

283314
enum LevelDecoderInner {
@@ -289,6 +320,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
289320
type Slice = [i16];
290321

291322
fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
323+
self.buffer.clear();
292324
match encoding {
293325
Encoding::RLE => {
294326
let mut decoder = RleDecoder::new(self.bit_width);
@@ -305,12 +337,25 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
305337
}
306338
}
307339

308-
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
340+
fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
341+
let read_from_buffer = match self.buffer.is_empty() {
342+
true => 0,
343+
false => {
344+
let read_from_buffer = self.buffer.len().min(range.end - range.start);
345+
out[range.start..range.start + read_from_buffer]
346+
.copy_from_slice(&self.buffer[0..read_from_buffer]);
347+
self.split_off_buffer(read_from_buffer);
348+
read_from_buffer
349+
}
350+
};
351+
range.start += read_from_buffer;
352+
309353
match self.decoder.as_mut().unwrap() {
310-
LevelDecoderInner::Packed(reader, bit_width) => {
311-
Ok(reader.get_batch::<i16>(&mut out[range], *bit_width as usize))
354+
LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
355+
+ reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
356+
LevelDecoderInner::Rle(reader) => {
357+
Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
312358
}
313-
LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]),
314359
}
315360
}
316361
}
@@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
323368
) -> Result<(usize, usize)> {
324369
let mut level_skip = 0;
325370
let mut value_skip = 0;
326-
match self.decoder.as_mut().unwrap() {
327-
LevelDecoderInner::Packed(reader, bit_width) => {
328-
for _ in 0..num_levels {
329-
// Values are delimited by max_def_level
330-
if max_def_level
331-
== reader
332-
.get_value::<i16>(*bit_width as usize)
333-
.expect("Not enough values in Packed ColumnLevelDecoderImpl.")
334-
{
335-
value_skip += 1;
336-
}
337-
level_skip += 1;
338-
}
339-
}
340-
LevelDecoderInner::Rle(reader) => {
341-
for _ in 0..num_levels {
342-
if let Some(level) = reader
343-
.get::<i16>()
344-
.expect("Not enough values in Rle ColumnLevelDecoderImpl.")
345-
{
346-
// Values are delimited by max_def_level
347-
if level == max_def_level {
348-
value_skip += 1;
349-
}
350-
}
351-
level_skip += 1;
371+
while level_skip < num_levels {
372+
let remaining_levels = num_levels - level_skip;
373+
374+
if self.buffer.is_empty() {
375+
// Only read number of needed values
376+
self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
377+
if self.buffer.is_empty() {
378+
// Reached end of page
379+
break;
352380
}
353381
}
382+
let to_read = self.buffer.len().min(remaining_levels);
383+
384+
level_skip += to_read;
385+
value_skip += self.buffer[..to_read]
386+
.iter()
387+
.filter(|x| **x == max_def_level)
388+
.count();
389+
390+
self.split_off_buffer(to_read)
354391
}
392+
355393
Ok((value_skip, level_skip))
356394
}
357395
}
358396

359397
impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
360-
fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
361-
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
398+
fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
399+
let mut level_skip = 0;
400+
let mut record_skip = 0;
401+
402+
loop {
403+
if self.buffer.is_empty() {
404+
// Read SKIP_BUFFER_SIZE as we don't know how many to read
405+
self.read_to_buffer(SKIP_BUFFER_SIZE)?;
406+
if self.buffer.is_empty() {
407+
// Reached end of page
408+
break;
409+
}
410+
}
411+
412+
let mut to_skip = 0;
413+
while to_skip < self.buffer.len() && record_skip != num_records {
414+
if self.buffer[to_skip] == 0 {
415+
record_skip += 1;
416+
}
417+
to_skip += 1;
418+
}
419+
420+
// Find end of record
421+
while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
422+
to_skip += 1;
423+
}
424+
425+
level_skip += to_skip;
426+
if to_skip >= self.buffer.len() {
427+
// Need to to read more values
428+
self.buffer.clear();
429+
continue;
430+
}
431+
432+
self.split_off_buffer(to_skip);
433+
break;
434+
}
435+
436+
Ok((record_skip, level_skip))
437+
}
438+
}
439+
440+
#[cfg(test)]
441+
mod tests {
442+
use super::*;
443+
use crate::encodings::rle::RleEncoder;
444+
use rand::prelude::*;
445+
446+
fn test_skip_levels<F>(encoded: &[i16], data: ByteBufferPtr, skip: F)
447+
where
448+
F: Fn(&mut ColumnLevelDecoderImpl, &mut usize, usize),
449+
{
450+
let mut rng = thread_rng();
451+
let mut decoder = ColumnLevelDecoderImpl::new(5);
452+
decoder.set_data(Encoding::RLE, data);
453+
454+
let mut read = 0;
455+
let mut decoded = vec![];
456+
let mut expected = vec![];
457+
while read < encoded.len() {
458+
let to_read = rng.gen_range(0..(encoded.len() - read).min(100)) + 1;
459+
460+
if rng.gen_bool(0.5) {
461+
skip(&mut decoder, &mut read, to_read)
462+
} else {
463+
let start = decoded.len();
464+
let end = decoded.len() + to_read;
465+
decoded.resize(end, 0);
466+
let actual_read = decoder.read(&mut decoded, start..end).unwrap();
467+
assert_eq!(actual_read, to_read);
468+
expected.extend_from_slice(&encoded[read..read + to_read]);
469+
read += to_read;
470+
}
471+
}
472+
assert_eq!(decoded, expected);
473+
}
474+
475+
#[test]
476+
fn test_skip() {
477+
let mut rng = thread_rng();
478+
let total_len = 10000;
479+
let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
480+
let mut encoder = RleEncoder::new(3, 1024);
481+
for v in &encoded {
482+
encoder.put(*v as _)
483+
}
484+
let data = ByteBufferPtr::new(encoder.consume());
485+
486+
for _ in 0..10 {
487+
test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
488+
let (values_skipped, levels_skipped) =
489+
decoder.skip_def_levels(to_read, 5).unwrap();
490+
assert_eq!(levels_skipped, to_read);
491+
492+
let expected = &encoded[*read..*read + to_read];
493+
let expected_values_skipped =
494+
expected.iter().filter(|x| **x == 5).count();
495+
assert_eq!(values_skipped, expected_values_skipped);
496+
*read += to_read;
497+
});
498+
499+
test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
500+
let (records_skipped, levels_skipped) =
501+
decoder.skip_rep_levels(to_read).unwrap();
502+
503+
// If not run out of values
504+
if levels_skipped + *read != encoded.len() {
505+
// Should have read correct number of records
506+
assert_eq!(records_skipped, to_read);
507+
// Next value should be start of record
508+
assert_eq!(encoded[levels_skipped + *read], 0);
509+
}
510+
511+
let expected = &encoded[*read..*read + levels_skipped];
512+
let expected_records_skipped =
513+
expected.iter().filter(|x| **x == 0).count();
514+
assert_eq!(records_skipped, expected_records_skipped);
515+
516+
*read += levels_skipped;
517+
});
518+
}
362519
}
363520
}

parquet/src/encodings/rle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ impl RleDecoder {
338338
// These functions inline badly, they tend to inline and then create very large loop unrolls
339339
// that damage L1d-cache occupancy. This results in a ~18% performance drop
340340
#[inline(never)]
341+
#[allow(unused)]
341342
pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
342343
assert!(size_of::<T>() <= 8);
343344

0 commit comments

Comments
 (0)