Skip to content

Commit 881752c

Browse files
committed
fix comment.
1 parent 25cb93d commit 881752c

File tree

6 files changed

+54
-27
lines changed

6 files changed

+54
-27
lines changed

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,16 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
120120
}
121121

122122
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
123-
self.record_reader.skip_records(num_records, self.pages.as_mut())
123+
if self.record_reader.column_reader().is_none() {
124+
// If we skip records before all read operation
125+
// we need set `column_reader` by `set_page_reader`
126+
if let Some(page_reader) = self.pages.next() {
127+
self.record_reader.set_page_reader(page_reader?)?;
128+
} else {
129+
return Ok(0);
130+
}
131+
}
132+
self.record_reader.skip_records(num_records)
124133
}
125134

126135
fn get_def_levels(&self) -> Option<&[i16]> {

parquet/src/arrow/array_reader/byte_array_dictionary.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,16 @@ where
181181
}
182182

183183
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
184-
self.record_reader.skip_records(num_records, self.pages.as_mut())
184+
if self.record_reader.column_reader().is_none() {
185+
// If we skip records before all read operation
186+
// we need set `column_reader` by `set_page_reader`
187+
if let Some(page_reader) = self.pages.next() {
188+
self.record_reader.set_page_reader(page_reader?)?;
189+
} else {
190+
return Ok(0);
191+
}
192+
}
193+
self.record_reader.skip_records(num_records)
185194
}
186195

187196
fn get_def_levels(&self) -> Option<&[i16]> {

parquet/src/arrow/array_reader/null_array.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,16 @@ where
9797
}
9898

9999
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
100-
self.record_reader.skip_records(num_records, self.pages.as_mut())
100+
if self.record_reader.column_reader().is_none() {
101+
// If we skip records before all read operation
102+
// we need set `column_reader` by `set_page_reader`
103+
if let Some(page_reader) = self.pages.next() {
104+
self.record_reader.set_page_reader(page_reader?)?;
105+
} else {
106+
return Ok(0);
107+
}
108+
}
109+
self.record_reader.skip_records(num_records)
101110
}
102111

103112
fn get_def_levels(&self) -> Option<&[i16]> {

parquet/src/arrow/array_reader/primitive_array.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,16 @@ where
222222
}
223223

224224
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
225-
self.record_reader.skip_records(num_records, self.pages.as_mut())
225+
if self.record_reader.column_reader().is_none() {
226+
// If we skip records before all read operation
227+
// we need set `column_reader` by `set_page_reader`
228+
if let Some(page_reader) = self.pages.next() {
229+
self.record_reader.set_page_reader(page_reader?)?;
230+
} else {
231+
return Ok(0);
232+
}
233+
}
234+
self.record_reader.skip_records(num_records)
226235
}
227236

228237
fn get_def_levels(&self) -> Option<&[i16]> {

parquet/src/arrow/arrow_reader.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,14 +309,13 @@ impl Iterator for ParquetRecordBatchReader {
309309

310310
// try to read record
311311
let to_read = match front.row_count.checked_sub(self.batch_size) {
312-
Some(remaining) => {
312+
Some(remaining) if remaining != 0 => {
313313
// if page row count less than batch_size we must set batch size to page row count.
314314
// add check avoid dead loop
315-
if remaining != 0 {
316-
selection.push_front(RowSelection::select(remaining));
317-
}
315+
selection.push_front(RowSelection::select(remaining));
318316
self.batch_size
319317
}
318+
Some(_) => self.batch_size,
320319
None => front.row_count,
321320
};
322321

parquet/src/arrow/record_reader/mod.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use crate::arrow::record_reader::{
2424
buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
2525
definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
2626
};
27-
use crate::column::page::PageIterator;
2827
use crate::column::{
2928
page::PageReader,
3029
reader::{
@@ -46,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024;
4645
pub type RecordReader<T> =
4746
GenericRecordReader<ScalarBuffer<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
4847

48+
pub(crate) type ColumnReader<CV> =
49+
GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
50+
4951
/// A generic stateful column reader that delimits semantic records
5052
///
5153
/// This type is hidden from the docs, and relies on private traits with no
@@ -57,9 +59,7 @@ pub struct GenericRecordReader<V, CV> {
5759
records: V,
5860
def_levels: Option<DefinitionLevelBuffer>,
5961
rep_levels: Option<ScalarBuffer<i16>>,
60-
column_reader: Option<
61-
GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>,
62-
>,
62+
column_reader: Option<ColumnReader<CV>>,
6363

6464
/// Number of records accumulated in records
6565
num_records: usize,
@@ -185,24 +185,11 @@ where
185185
/// # Returns
186186
///
187187
/// Number of records skipped
188-
pub fn skip_records(
189-
&mut self,
190-
num_records: usize,
191-
pages: &mut dyn PageIterator,
192-
) -> Result<usize> {
188+
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
193189
// First need to clear the buffer
194190
let end_of_column = match self.column_reader.as_mut() {
195191
Some(reader) => !reader.has_next()?,
196-
None => {
197-
// If we skip records before all read operation
198-
// we need set `column_reader` by `set_page_reader`
199-
if let Some(page_reader) = pages.next() {
200-
self.set_page_reader(page_reader?)?;
201-
false
202-
} else {
203-
return Ok(0);
204-
}
205-
}
192+
None => return Ok(0),
206193
};
207194

208195
let (buffered_records, buffered_values) =
@@ -292,6 +279,11 @@ where
292279
.map(|levels| levels.split_bitmask(self.num_values))
293280
}
294281

282+
/// Returns column reader.
283+
pub(crate) fn column_reader(&self) -> Option<&ColumnReader<CV>> {
284+
self.column_reader.as_ref()
285+
}
286+
295287
/// Try to read one batch of data.
296288
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
297289
let rep_levels = self

0 commit comments

Comments
 (0)