Skip to content

feat(parquet): add byte buffer when disable buffered stream#302

Merged
zeroshade merged 5 commits intoapache:mainfrom
joechenrh:add-bytereader
Mar 7, 2025
Merged

feat(parquet): add byte buffer when disable buffered stream#302
zeroshade merged 5 commits intoapache:mainfrom
joechenrh:add-bytereader

Conversation

@joechenrh
Copy link
Copy Markdown
Contributor

@joechenrh joechenrh commented Mar 4, 2025

Rationale for this change

When we create a reader with BufferedStreamEnabled = false, we have to first create a data slice.

func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (BufferedReader, error) {
if r.BufferedStreamEnabled {
return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize)), nil
}
data := make([]byte, nbytes)
n, err := source.ReadAt(data, start)
if err != nil {
return nil, fmt.Errorf("parquet: tried reading from file, but got error: %w", err)
}
if n != int(nbytes) {
return nil, fmt.Errorf("parquet: tried reading %d bytes starting at position %d from file but only got %d", nbytes, start, n)
}
return utils.NewBufferedReader(bytes.NewReader(data), int(nbytes)), nil
}

And we will create another slice with same size in the NewBufferedReader again, which is redundant.

func NewBufferedReader(rd Reader, sz int) *bufferedReader {
r := &bufferedReader{
rd: rd,
}
r.resizeBuffer(sz)
return r
}

What changes are included in this PR?

  • Add a new struct byteReader, which is a wrapper of bytes.NewReader and implement BufferedReader interface.
  • Remove BufferSize() method from BufferedReader interface

Are these changes tested?

BufferedReader created from bytes.NewReader in tests are all replaced by new function NewByteReader.

Are there any user-facing changes?

No.

@joechenrh joechenrh requested a review from zeroshade as a code owner March 4, 2025 13:33
Comment on lines +534 to +536
rd := utils.NewBufferedReader(
io.NewSectionReader(p.r.Outer(), p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset),
p.r.BufferSize())
int(parquet.DefaultBufSize))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would remove our respecting of the BufferSize property in the reader properties, we shouldn't be removing the BufferSize() method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how about change it to:

readBufSize := min(p.dataOffset-p.baseOffset, p.r.BufferSize())
rd := utils.NewBufferedReader(
	io.NewSectionReader(p.r.Outer(), p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset),
	readBufSize)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that works

@joechenrh
Copy link
Copy Markdown
Contributor Author

Can I manually rerun the CI?

@kou
Copy link
Copy Markdown
Member

kou commented Mar 7, 2025

Done!

Copy link
Copy Markdown
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@zeroshade zeroshade merged commit 0f0d667 into apache:main Mar 7, 2025
23 checks passed
@joechenrh joechenrh deleted the add-bytereader branch August 28, 2025 05:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants