-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Implement an AsyncReader for avro using ObjectStore #8930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
jecsand838
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing a partial review with some high level thoughts.
I'll wait for you to finish before resuming.
Honestly I think my main blocker is the schema thing here. I don't want to commit to the constructor before it is resolved as its a public API and I don't want it to be volatile |
100% I'm working on that right now and won't stop until I have a PR. That was a solid catch. The schema logic is an area of the code I mean to (or would welcome) a full refactor of. I knew it would eventually come back. |
|
Sorry, I haven't dropped it, just found myself in a really busy week! The generic reader support does not seem to hard to implement from the dabbling I made, and I still need to get to the builder pattern change |
…, separate object store file reader into a featuregated struct and use a generic async file reader trait
|
@jecsand838 I believe this is now ready for a proper review^ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@EmilyMatt Thank you so much for getting these changes up!
I left a few comments. Let me know what you think.
EDIT: Should have mentioned that this is looking really good overall and I'm very excited for the AsyncReader!
|
@jecsand838 and @EmilyMatt -- how is this PR looking? |
I had actually just returned to work on it 2 days ago, still having some issues with the schema now being provided, due to the problems I've described, @jecsand838 suggested removing the arrow schema and I'm starting to think that is the only viable way for now. |
|
Hope to push another version today and address some of the things above |
|
@jecsand838 I've shamelessly plagiarized the API for the object reader from the parquet crate, but that's ok IMO, it lays the foundations for a common API in a few versions. |
@EmilyMatt Absolutely, I'll have time to give this a solid review tonight. Ty for getting these changes in! |
jecsand838
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@EmilyMatt Flushing a partial review here. Looking really good overall. I know this is a huge PR and your high-level changes look great.
I left more code-level comments with one architectural call-out you may want to consider. Overall this is looking solid and I'm super stoked about this async reader for arrow-avro.
| let consumed = self.block_decoder.decode(&data)?; | ||
| if consumed == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there maybe an issue with using consumed == 0 as the signal for detecting incomplete blocks here.
Looking at BlockDecoder::decode in block.rs lines 78-129, it returns 0 only when:
- The input buffer is empty at the start, OR
- The decoder is already in Finished state
For a truly incomplete block,decode()consumes all available bytes (returnsdata.len()) andflush()returnsNone. The current logic likely never triggers when it should.
You may want to consider changing the detection logic to check flush() first:
ReaderState::DecodingBlock { mut reader, mut data } => {
let consumed = self.block_decoder.decode(&data)?;
data = data.slice(consumed..);
// Check for complete block FIRST
if let Some(block) = self.block_decoder.flush() {
let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
codec.decompress(&block.data)?
} else {
block.data
});
self.reader_state = ReaderState::ReadingBatches {
reader, data, block_data,
remaining_in_block: block.count,
};
continue;
}
// No complete block
if data.is_empty() && consumed == 0 {
// No progress on empty buffer = EOF
let final_batch = self.decoder.flush();
self.reader_state = ReaderState::Finished;
return Poll::Ready(final_batch.transpose());
}
if data.is_empty() {
// All data consumed but block incomplete - need more bytes
// (incomplete block handling logic here)
} else {
// Still have data to process
self.reader_state = ReaderState::DecodingBlock { reader, data };
}
} |
|
||
| // Two longs: count and size have already been read, but using our vlq, | ||
| // meaning they were not consumed. | ||
| let total_block_size = size + vlq_header_len; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any risks from the calculation omitting the 16-byte sync marker here?
| ReaderState::Limbo => { | ||
| unreachable!("ReaderState::Limbo should never be observed"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ReaderState::Limbo variant really necessary? Could we use Finished and if a bug causes an early return without setting state, the stream just ends (which is safer than panicking)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A user will never know they did not actually finish writing the file, they will think they've just reached the end, this is in my opinion times of magnitude more severe than crashing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean in the event of that occurring an ArrowError should be passed back which would alert the user.
| ReaderState::DecodingBlock { | ||
| mut reader, | ||
| mut data, | ||
| } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about it and you may want to consider a decode loop similar to the sync Reader::read method's, specifically this logic:
let consumed = self.block_decoder.decode(buf)?;
self.reader.consume(consumed);
if let Some(block) = self.block_decoder.flush() {
// Block complete - use it
} else if consumed == 0 {
// Stuck on non-empty buffer - error
return Err(ArrowError::ParseError(...));
}
// Otherwise: made progress, loop for more dataFrom an architectural perspective the advantages would be:
- Always calls
flush()afterdecode()to check for complete blocks - Only errors when stuck (
consumed == 0on non-empty buffer ANDflush() == None) - Trusts
BlockDecoderto handle partial data incrementally
Maybe it could resemble something like this pseduo-code?
ReaderState::DecodingBlock { mut reader, mut data } = {
let consumed = self.block_decoder.decode(&data)?;
data = data.slice(consumed..); // Equivalent to reader.consume()
if let Some(block) = self.block_decoder.flush() {
// Block complete - proceed to ReadingBatches
let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
codec.decompress(&block.data)?
} else {
block.data
});
self.reader_state = ReaderState::ReadingBatches {
reader, data, block_data,
remaining_in_block: block.count,
};
continue;
}
// No complete block yet
if consumed == 0 && !data.is_empty() {
// Stuck - no progress on non-empty buffer = corrupted data
return Poll::Ready(Some(Err(ArrowError::ParseError(
"Could not decode next Avro block from partial data".into()
))));
}
if data.is_empty() {
// Buffer exhausted, block incomplete
if self.finishing_partial_block {
return Poll::Ready(Some(Err(ArrowError::AvroError(
"Unexpected EOF while reading last Avro block".into()
))));
}
// Fetch more data (range end case) or finish
// ... simplified fetch logic here ...
} else {
// Made progress but not complete - continue decoding
self.reader_state = ReaderState::DecodingBlock { reader, data };
}
}# Conflicts: # arrow-avro/src/reader/mod.rs
Which issue does this PR close?
Rationale for this change
Allows for proper file splitting within an asynchronous context.
What changes are included in this PR?
The raw implementation, allowing for file splitting, starting mid-block(read until sync marker is found), and further reading until end of block is found.
This reader currently requires a reader_schema is provided if type-promotion, schema-evolution, or projection are desired.
This is done so because #8928 is currently blocking proper parsing from an ArrowSchema
Are these changes tested?
Yes
Are there any user-facing changes?
Only the addition.
Other changes are internal to the crate (namely the way Decoder is created from parts)