@@ -264,9 +264,13 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
264264 }
265265}
266266
267+ const SKIP_BUFFER_SIZE : usize = 1024 ;
268+
267269/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
268270pub struct ColumnLevelDecoderImpl {
269271 decoder : Option < LevelDecoderInner > ,
272+ /// Temporary buffer populated when skipping values
273+ buffer : Vec < i16 > ,
270274 bit_width : u8 ,
271275}
272276
@@ -275,9 +279,36 @@ impl ColumnLevelDecoderImpl {
275279 let bit_width = num_required_bits ( max_level as u64 ) ;
276280 Self {
277281 decoder : None ,
282+ buffer : vec ! [ ] ,
278283 bit_width,
279284 }
280285 }
286+
287+ /// Drops the first `len` values from the internal buffer
288+ fn split_off_buffer ( & mut self , len : usize ) {
289+ match self . buffer . len ( ) == len {
290+ true => self . buffer . clear ( ) ,
291+ false => {
292+ // Move to_read elements to end of slice
293+ self . buffer . rotate_left ( len) ;
294+ // Truncate buffer
295+ self . buffer . truncate ( self . buffer . len ( ) - len) ;
296+ }
297+ }
298+ }
299+
300+ /// Reads up to `to_read` values to the internal buffer
301+ fn read_to_buffer ( & mut self , to_read : usize ) -> Result < ( ) > {
302+ let mut buf = std:: mem:: take ( & mut self . buffer ) ;
303+
304+ // Repopulate buffer
305+ buf. resize ( to_read, 0 ) ;
306+ let actual = self . read ( & mut buf, 0 ..to_read) ?;
307+ buf. truncate ( actual) ;
308+
309+ self . buffer = buf;
310+ Ok ( ( ) )
311+ }
281312}
282313
283314enum LevelDecoderInner {
@@ -289,6 +320,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
289320 type Slice = [ i16 ] ;
290321
291322 fn set_data ( & mut self , encoding : Encoding , data : ByteBufferPtr ) {
323+ self . buffer . clear ( ) ;
292324 match encoding {
293325 Encoding :: RLE => {
294326 let mut decoder = RleDecoder :: new ( self . bit_width ) ;
@@ -305,12 +337,25 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
305337 }
306338 }
307339
308- fn read ( & mut self , out : & mut Self :: Slice , range : Range < usize > ) -> Result < usize > {
340+ fn read ( & mut self , out : & mut Self :: Slice , mut range : Range < usize > ) -> Result < usize > {
341+ let read_from_buffer = match self . buffer . is_empty ( ) {
342+ true => 0 ,
343+ false => {
344+ let read_from_buffer = self . buffer . len ( ) . min ( range. end - range. start ) ;
345+ out[ range. start ..range. start + read_from_buffer]
346+ . copy_from_slice ( & self . buffer [ 0 ..read_from_buffer] ) ;
347+ self . split_off_buffer ( read_from_buffer) ;
348+ read_from_buffer
349+ }
350+ } ;
351+ range. start += read_from_buffer;
352+
309353 match self . decoder . as_mut ( ) . unwrap ( ) {
310- LevelDecoderInner :: Packed ( reader, bit_width) => {
311- Ok ( reader. get_batch :: < i16 > ( & mut out[ range] , * bit_width as usize ) )
354+ LevelDecoderInner :: Packed ( reader, bit_width) => Ok ( read_from_buffer
355+ + reader. get_batch :: < i16 > ( & mut out[ range] , * bit_width as usize ) ) ,
356+ LevelDecoderInner :: Rle ( reader) => {
357+ Ok ( read_from_buffer + reader. get_batch ( & mut out[ range] ) ?)
312358 }
313- LevelDecoderInner :: Rle ( reader) => reader. get_batch ( & mut out[ range] ) ,
314359 }
315360 }
316361}
@@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
323368 ) -> Result < ( usize , usize ) > {
324369 let mut level_skip = 0 ;
325370 let mut value_skip = 0 ;
326- match self . decoder . as_mut ( ) . unwrap ( ) {
327- LevelDecoderInner :: Packed ( reader, bit_width) => {
328- for _ in 0 ..num_levels {
329- // Values are delimited by max_def_level
330- if max_def_level
331- == reader
332- . get_value :: < i16 > ( * bit_width as usize )
333- . expect ( "Not enough values in Packed ColumnLevelDecoderImpl." )
334- {
335- value_skip += 1 ;
336- }
337- level_skip += 1 ;
338- }
339- }
340- LevelDecoderInner :: Rle ( reader) => {
341- for _ in 0 ..num_levels {
342- if let Some ( level) = reader
343- . get :: < i16 > ( )
344- . expect ( "Not enough values in Rle ColumnLevelDecoderImpl." )
345- {
346- // Values are delimited by max_def_level
347- if level == max_def_level {
348- value_skip += 1 ;
349- }
350- }
351- level_skip += 1 ;
371+ while level_skip < num_levels {
372+ let remaining_levels = num_levels - level_skip;
373+
374+ if self . buffer . is_empty ( ) {
375+ // Only read number of needed values
376+ self . read_to_buffer ( remaining_levels. min ( SKIP_BUFFER_SIZE ) ) ?;
377+ if self . buffer . is_empty ( ) {
378+ // Reached end of page
379+ break ;
352380 }
353381 }
382+ let to_read = self . buffer . len ( ) . min ( remaining_levels) ;
383+
384+ level_skip += to_read;
385+ value_skip += self . buffer [ ..to_read]
386+ . iter ( )
387+ . filter ( |x| * * x == max_def_level)
388+ . count ( ) ;
389+
390+ self . split_off_buffer ( to_read)
354391 }
392+
355393 Ok ( ( value_skip, level_skip) )
356394 }
357395}
358396
359397impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
360- fn skip_rep_levels ( & mut self , _num_records : usize ) -> Result < ( usize , usize ) > {
361- Err ( nyi_err ! ( "https://github.com/apache/arrow-rs/issues/1792" ) )
398+ fn skip_rep_levels ( & mut self , num_records : usize ) -> Result < ( usize , usize ) > {
399+ let mut level_skip = 0 ;
400+ let mut record_skip = 0 ;
401+
402+ loop {
403+ if self . buffer . is_empty ( ) {
404+ // Read SKIP_BUFFER_SIZE as we don't know how many to read
405+ self . read_to_buffer ( SKIP_BUFFER_SIZE ) ?;
406+ if self . buffer . is_empty ( ) {
407+ // Reached end of page
408+ break ;
409+ }
410+ }
411+
412+ let mut to_skip = 0 ;
413+ while to_skip < self . buffer . len ( ) && record_skip != num_records {
414+ if self . buffer [ to_skip] == 0 {
415+ record_skip += 1 ;
416+ }
417+ to_skip += 1 ;
418+ }
419+
420+ // Find end of record
421+ while to_skip < self . buffer . len ( ) && self . buffer [ to_skip] != 0 {
422+ to_skip += 1 ;
423+ }
424+
425+ level_skip += to_skip;
426+ if to_skip >= self . buffer . len ( ) {
427+ // Need to to read more values
428+ self . buffer . clear ( ) ;
429+ continue ;
430+ }
431+
432+ self . split_off_buffer ( to_skip) ;
433+ break ;
434+ }
435+
436+ Ok ( ( record_skip, level_skip) )
437+ }
438+ }
439+
440+ #[ cfg( test) ]
441+ mod tests {
442+ use super :: * ;
443+ use crate :: encodings:: rle:: RleEncoder ;
444+ use rand:: prelude:: * ;
445+
446+ fn test_skip_levels < F > ( encoded : & [ i16 ] , data : ByteBufferPtr , skip : F )
447+ where
448+ F : Fn ( & mut ColumnLevelDecoderImpl , & mut usize , usize ) ,
449+ {
450+ let mut rng = thread_rng ( ) ;
451+ let mut decoder = ColumnLevelDecoderImpl :: new ( 5 ) ;
452+ decoder. set_data ( Encoding :: RLE , data) ;
453+
454+ let mut read = 0 ;
455+ let mut decoded = vec ! [ ] ;
456+ let mut expected = vec ! [ ] ;
457+ while read < encoded. len ( ) {
458+ let to_read = rng. gen_range ( 0 ..( encoded. len ( ) - read) . min ( 100 ) ) + 1 ;
459+
460+ if rng. gen_bool ( 0.5 ) {
461+ skip ( & mut decoder, & mut read, to_read)
462+ } else {
463+ let start = decoded. len ( ) ;
464+ let end = decoded. len ( ) + to_read;
465+ decoded. resize ( end, 0 ) ;
466+ let actual_read = decoder. read ( & mut decoded, start..end) . unwrap ( ) ;
467+ assert_eq ! ( actual_read, to_read) ;
468+ expected. extend_from_slice ( & encoded[ read..read + to_read] ) ;
469+ read += to_read;
470+ }
471+ }
472+ assert_eq ! ( decoded, expected) ;
473+ }
474+
475+ #[ test]
476+ fn test_skip ( ) {
477+ let mut rng = thread_rng ( ) ;
478+ let total_len = 10000 ;
479+ let encoded: Vec < i16 > = ( 0 ..total_len) . map ( |_| rng. gen_range ( 0 ..5 ) ) . collect ( ) ;
480+ let mut encoder = RleEncoder :: new ( 3 , 1024 ) ;
481+ for v in & encoded {
482+ encoder. put ( * v as _ )
483+ }
484+ let data = ByteBufferPtr :: new ( encoder. consume ( ) ) ;
485+
486+ for _ in 0 ..10 {
487+ test_skip_levels ( & encoded, data. clone ( ) , |decoder, read, to_read| {
488+ let ( values_skipped, levels_skipped) =
489+ decoder. skip_def_levels ( to_read, 5 ) . unwrap ( ) ;
490+ assert_eq ! ( levels_skipped, to_read) ;
491+
492+ let expected = & encoded[ * read..* read + to_read] ;
493+ let expected_values_skipped =
494+ expected. iter ( ) . filter ( |x| * * x == 5 ) . count ( ) ;
495+ assert_eq ! ( values_skipped, expected_values_skipped) ;
496+ * read += to_read;
497+ } ) ;
498+
499+ test_skip_levels ( & encoded, data. clone ( ) , |decoder, read, to_read| {
500+ let ( records_skipped, levels_skipped) =
501+ decoder. skip_rep_levels ( to_read) . unwrap ( ) ;
502+
503+ // If not run out of values
504+ if levels_skipped + * read != encoded. len ( ) {
505+ // Should have read correct number of records
506+ assert_eq ! ( records_skipped, to_read) ;
507+ // Next value should be start of record
508+ assert_eq ! ( encoded[ levels_skipped + * read] , 0 ) ;
509+ }
510+
511+ let expected = & encoded[ * read..* read + levels_skipped] ;
512+ let expected_records_skipped =
513+ expected. iter ( ) . filter ( |x| * * x == 0 ) . count ( ) ;
514+ assert_eq ! ( records_skipped, expected_records_skipped) ;
515+
516+ * read += levels_skipped;
517+ } ) ;
518+ }
362519 }
363520}
0 commit comments