Skip to content

Conversation

@EmilyMatt
Copy link

Which issue does this PR close?

Rationale for this change

Allows for proper file splitting within an asynchronous context.

What changes are included in this PR?

The raw implementation, allowing for file splitting, starting mid-block(read until sync marker is found), and further reading until end of block is found.
This reader currently requires a reader_schema is provided if type-promotion, schema-evolution, or projection are desired.
This is done so because #8928 is currently blocking proper parsing from an ArrowSchema

Are these changes tested?

Yes

Are there any user-facing changes?

Only the addition.
Other changes are internal to the crate (namely the way Decoder is created from parts)

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing a partial review with some high level thoughts.

I'll wait for you to finish before resuming.

@EmilyMatt
Copy link
Author

Flushing a partial review with some high level thoughts.

I'll wait for you to finish before resuming.

Honestly I think my main blocker is the schema thing here. I don't want to commit to the constructor before it is resolved as its a public API and I don't want it to be volatile

@jecsand838
Copy link
Contributor

jecsand838 commented Nov 26, 2025

Flushing a partial review with some high level thoughts.
I'll wait for you to finish before resuming.

Honestly I think my main blocker is the schema thing here. I don't want to commit to the constructor before it is resolved as its a public API and I don't want it to be volatile

100% I'm working on that right now and won't stop until I have a PR. That was a solid catch.

The schema logic is an area of the code I mean to (or would welcome) a full refactor of. I knew it would eventually come back.

@EmilyMatt
Copy link
Author

Sorry, I haven't dropped it, just found myself in a really busy week! The generic reader support does not seem to hard to implement from the dabbling I made, and I still need to get to the builder pattern change

…, separate object store file reader into a featuregated struct and use a generic async file reader trait
@EmilyMatt
Copy link
Author

@jecsand838 I believe this is now ready for a proper review^

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt Thank you so much for getting these changes up!

I left a few comments. Let me know what you think.

EDIT: Should have mentioned that this is looking really good overall and I'm very excited for the AsyncReader!

@alamb
Copy link
Contributor

alamb commented Jan 10, 2026

@jecsand838 and @EmilyMatt -- how is this PR looking?

@EmilyMatt
Copy link
Author

@jecsand838 and @EmilyMatt -- how is this PR looking?

I had actually just returned to work on it 2 days ago, still having some issues with the schema now being provided, due to the problems I've described, @jecsand838 suggested removing the arrow schema and I'm starting to think that is the only viable way for now.
Making the fetch API a bit closer to the one parquet uses is the smaller issue, I do wish to keep the seperate semantics for the original fetch and extra fetch(for parquet for example, that will be the row groups ranges, and the footer range), will try a couple ways to do this

@EmilyMatt
Copy link
Author

Hope to push another version today and address some of the things above

@EmilyMatt
Copy link
Author

@jecsand838 I've shamelessly plagiarized the API for the object reader from the parquet crate, but that's ok IMO, it lays the foundations for a common API in a few versions.
I believe I've addressed everything, let me know if anything pops to mind

@jecsand838
Copy link
Contributor

@jecsand838 I've shamelessly plagiarized the API for the object reader from the parquet crate, but that's ok IMO, it lays the foundations for a common API in a few versions. I believe I've addressed everything, let me know if anything pops to mind

@EmilyMatt Absolutely, I'll have time to give this a solid review tonight. Ty for getting these changes in!

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt Flushing a partial review here. Looking really good overall. I know this is a huge PR and your high-level changes look great.

I left more code-level comments with one architectural call-out you may want to consider. Overall this is looking solid and I'm super stoked about this async reader for arrow-avro.

Comment on lines +164 to +165
let consumed = self.block_decoder.decode(&data)?;
if consumed == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there maybe an issue with using consumed == 0 as the signal for detecting incomplete blocks here.

Looking at BlockDecoder::decode in block.rs lines 78-129, it returns 0 only when:

  • The input buffer is empty at the start, OR
  • The decoder is already in Finished state
    For a truly incomplete block, decode() consumes all available bytes (returns data.len()) and flush() returns None. The current logic likely never triggers when it should.

You may want to consider changing the detection logic to check flush() first:

ReaderState::DecodingBlock { mut reader, mut data } => {                                                                                                                                                                         
    let consumed = self.block_decoder.decode(&data)?;                                                                                                                                                                            
    data = data.slice(consumed..);                                                                                                                                                                                               
                                                                                                                                                                                                                                 
    // Check for complete block FIRST                                                                                                                                                                                            
    if let Some(block) = self.block_decoder.flush() {                                                                                                                                                                            
        let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {                                                                                                                                                 
            codec.decompress(&block.data)?                                                                                                                                                                                       
        } else {                                                                                                                                                                                                                 
            block.data                                                                                                                                                                                                           
        });                                                                                                                                                                                                                      
        self.reader_state = ReaderState::ReadingBatches {                                                                                                                                                                        
            reader, data, block_data,                                                                                                                                                                                            
            remaining_in_block: block.count,                                                                                                                                                                                     
        };                                                                                                                                                                                                                       
        continue;                                                                                                                                                                                                                
    }                                                                                                                                                                                                                            
                                                                                                                                                                                                                                 
    // No complete block                                                                                                                                                                               
    if data.is_empty() && consumed == 0 {                                                                                                                                                                                        
        // No progress on empty buffer = EOF                                                                                                                                                                                     
        let final_batch = self.decoder.flush();                                                                                                                                                                                  
        self.reader_state = ReaderState::Finished;                                                                                                                                                                               
        return Poll::Ready(final_batch.transpose());                                                                                                                                                                             
    }                                                                                                                                                                                                                            
                                                                                                                                                                                                                                 
    if data.is_empty() {                                                                                                                                                                                                         
        // All data consumed but block incomplete - need more bytes                                                                                                                                                              
        // (incomplete block handling logic here)                                                                                                                                                                                
    } else {                                                                                                                                                                                                                     
        // Still have data to process                                                                                                                                                                                            
        self.reader_state = ReaderState::DecodingBlock { reader, data };                                                                                                                                                         
    }                                                                                                                                                                                                                            
} 


// Two longs: count and size have already been read, but using our vlq,
// meaning they were not consumed.
let total_block_size = size + vlq_header_len;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risks from the calculation omitting the 16-byte sync marker here?

Comment on lines +156 to +158
ReaderState::Limbo => {
unreachable!("ReaderState::Limbo should never be observed");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ReaderState::Limbo variant really necessary? Could we use Finished and if a bug causes an early return without setting state, the stream just ends (which is safer than panicking)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user will never know they did not actually finish writing the file, they will think they've just reached the end, this is in my opinion times of magnitude more severe than crashing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in the event of that occurring an ArrowError should be passed back which would alert the user.

Comment on lines +159 to +162
ReaderState::DecodingBlock {
mut reader,
mut data,
} => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it and you may want to consider a decode loop similar to the sync Reader::read method's, specifically this logic:

let consumed = self.block_decoder.decode(buf)?;
self.reader.consume(consumed);
if let Some(block) = self.block_decoder.flush() {
    // Block complete - use it
} else if consumed == 0 {
    // Stuck on non-empty buffer - error
    return Err(ArrowError::ParseError(...));
}
// Otherwise: made progress, loop for more data

From an architectural perspective the advantages would be:

  1. Always calls flush() after decode() to check for complete blocks
  2. Only errors when stuck (consumed == 0 on non-empty buffer AND flush() == None)
  3. Trusts BlockDecoder to handle partial data incrementally

Maybe it could resemble something like this pseduo-code?

 ReaderState::DecodingBlock { mut reader, mut data } = {
     let consumed = self.block_decoder.decode(&data)?;
     data = data.slice(consumed..);  // Equivalent to reader.consume()

     if let Some(block) = self.block_decoder.flush() {
         // Block complete - proceed to ReadingBatches
         let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
             codec.decompress(&block.data)?
         } else {
             block.data
         });
         self.reader_state = ReaderState::ReadingBatches {
             reader, data, block_data,
             remaining_in_block: block.count,
         };
         continue;
     }

     // No complete block yet
     if consumed == 0 && !data.is_empty() {
         // Stuck - no progress on non-empty buffer = corrupted data
         return Poll::Ready(Some(Err(ArrowError::ParseError(
             "Could not decode next Avro block from partial data".into()
         ))));
     }

     if data.is_empty() {
         // Buffer exhausted, block incomplete
         if self.finishing_partial_block {
             return Poll::Ready(Some(Err(ArrowError::AvroError(
                 "Unexpected EOF while reading last Avro block".into()
             ))));
         }
         // Fetch more data (range end case) or finish
         // ... simplified fetch logic here ...
     } else {
         // Made progress but not complete - continue decoding
         self.reader_state = ReaderState::DecodingBlock { reader, data };
     }
 }

# Conflicts:
#	arrow-avro/src/reader/mod.rs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate arrow-avro arrow-avro crate parquet Changes to the parquet crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement an async AvroReader

3 participants