-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add arrow-avro Reader support for Dense Union and Union resolution (Part 2) #8349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add arrow-avro Reader support for Dense Union and Union resolution (Part 2) #8349
Conversation
f6f21ad to
e606f43
Compare
e606f43 to
976730a
Compare
afc9006 to
1a73c4b
Compare
1a73c4b to
1c1e58c
Compare
nathaniel-d-ef
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really solid to me, great work 👍
|
@alamb @scovich Would either of you have time to review this PR? Half of the PR is test code, so the actual production code diff is about ~900 lines. I also have ideas on improving this further and plan to refine it in follow-up PRs. This was an interesting one to tackle and I found it challenging to do so in a clean manner. Finally this is a pretty important PR, as it's the last major PR the |
scovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty far out of my expertise on this PR, but hopefully the comments are still useful.
arrow-avro/src/reader/record.rs
Outdated
| reader_type_codes: Arc<[i8]>, | ||
| }, | ||
| ToSingle { | ||
| target: Box<Decoder>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decoder seems to be a concrete type, so why boxed? Is there somehow infinite type recursion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, there's the possibility of infinite type recursion with Decoder. I was planning to come back and attempt to remove the need for Box, but that would also impact Array, Map, and Nullable.
arrow-avro/src/reader/record.rs
Outdated
| } | ||
|
|
||
| impl UnionResolutionBuilder { | ||
| #[inline] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems highly unlikely that building union resolvers would be on the critical path of any workload that involves any actual data? Is #[inline] really needed on these methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good callout, I'll remove those.
arrow-avro/src/reader/record.rs
Outdated
| fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self { | ||
| self.resolved = Some(resolved_union.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it's recommended to take an owned arg, if owned is needed. Otherwise, a caller who could relinquish ownership of a value cannot do so, and we end up needlessly cloning.
... but it looks like all callers of this (private) method only have a reference, so maybe it's fine to leave as-is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you're right. We should remove the clone from here and clean this up.
arrow-avro/src/reader/record.rs
Outdated
| UnionResolvedKind::FromSingle { | ||
| target_reader_index, | ||
| .. | ||
| } => { | ||
| let type_id = fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a lot of redundancy between Both and FromSingle cases?
I think the former just hardwires target_reader_index=0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich There's definitely some redundancy here. I tried to abstract it as much as I could without a refactor. I'm leaving these branches separate because the spec is clear there's two different types of resolution as play and I'm unsure how Sparse Unions will fit in here. I planned to follow-up on this.
arrow-avro/src/reader/record.rs
Outdated
| let type_id = fields | ||
| .iter() | ||
| .nth(idx) | ||
| .map(|(type_id, _)| type_id) | ||
| .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0)); | ||
| type_ids.push(type_id); | ||
| offsets.push(encoding_counts[idx]); | ||
| encodings[idx].decode(buf)?; | ||
| encoding_counts[idx] += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More redundancy with the previous block of code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich So I ended up realizing that we'd need some sort of dedicated UnionDecoder implementation to fully abstract this cleanly. I was wanting to follow-up on that as part of the same future PR. It would be a pretty large charge and I want to make sure to develop it in a way that covers sparse Union types as well.
| "Promotion String->Bytes target mismatch".into(), | ||
| )), | ||
| }, | ||
| Promotion::BytesToString => match self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one isn't the same as the others... it's technically a narrowing cast which could fail if the input bytes are not valid utf-8. At least the int -> float conversions are converting casts (= infallible).
Does the spec require this? And if so, should we validate the data here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the spec requires BytesToString promotions.
I'll clean this up a bit more though to your point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No check that the input bytes are valid utf-8?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the call std::str::from_utf8(data) below validates that the input is valid utf8
arrow-avro/src/reader/record.rs
Outdated
| Promotion::LongToFloat => match self { | ||
| Self::Float32(v) => { | ||
| v.push(buf.get_long()? as f32); | ||
| Ok(()) | ||
| } | ||
| _ => Err(ArrowError::ParseError( | ||
| "Promotion Long->Float target mismatch".into(), | ||
| )), | ||
| }, | ||
| Promotion::LongToDouble => match self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically these are narrowing/converting casts because f32 and f64 respectively have only 24 and 53 bits of precision (same problem casting i32 to f32).
Does the spec require treating this as a "promotion" even tho it's lossy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
9936c89 to
d636451
Compare
@scovich Thank you so much for the review. Your comments were very useful and most of them should now be addressed in my latest commit. I'm wanting to follow-up on handling some of the redundancy you called out in your review. While I did clean up and abstracted what made sense, I think a full abstraction is part of a larger effort that needs to consider sparse unions as well. Let me know what you think whenever you get a chance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits. Just two questions before merge:
- #8349 (comment) - Should bytes -> string promotion validate that the bytes contain only valid utf-8?
- #8349 (comment) - Can
UnionDecoderuse a three-valued enum instead of a pair of booleans to capture the three valid stats? Or is there a legit reason somebody needs us to support the false/false case as passthrough?
(NOTE: I'm not an avro spec expert, so hopefully I'm not the only stamp on this PR)
arrow-avro/src/reader/record.rs
Outdated
| "Sparse Arrow unions are not yet supported".to_string(), | ||
| )); | ||
| } | ||
| (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uuid didn't change? Can/should we leave it in its original position to reduce diff churn?
arrow-avro/src/reader/record.rs
Outdated
|
|
||
| impl Decoder { | ||
| fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> { | ||
| // Extract just the Promotion (if any) to simplify pattern matching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment might be a bit stale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe just needs to move down to try_new_internal that it seems to have been split apart from?
| Self::Record(_, e, _) => { | ||
| for encoding in e.iter_mut() { | ||
| encoding.append_null(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changed, out of curiosity? Doesn't seem related to this PR?
Did clippy flag it for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is related to me modifying the signature for append_null(&mut self) -> Result<(), ArrowError> to help bubble up errors from the new Union branch.
| "Promotion String->Bytes target mismatch".into(), | ||
| )), | ||
| }, | ||
| Promotion::BytesToString => match self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No check that the input bytes are valid utf-8?
arrow-avro/src/reader/record.rs
Outdated
| "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target" | ||
| .to_string(), | ||
| )), | ||
| (false, false) => Ok(UnionReadPlan::Passthrough), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to systematically eliminate this false/false case?
Does any legitimate (transitive) call site actually need it?
Or can we replace the pair of booleans with a three-variant enum?
| Self::Array(field, offsets, values) => { | ||
| let values = values.flush(None)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: picking on the docs a bit --
the implementation expects them to be positive
Technically, zero is not mathematically positive... it's the only value that is both non-negative and non-positive.
Perhaps rust officially has some other definition, but Google search AI overview doesn't seem to think so:

arrow-avro/src/reader/record.rs
Outdated
| let type_ids_buf = flush_values(&mut self.type_ids).into_iter().collect(); | ||
| let offsets_buf = flush_values(&mut self.offsets).into_iter().collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do these stay one-liners if folded in to their respective single use sites (below)?
Or does that hurt readability?
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
974ea94 to
761adba
Compare
I wanted to avoid validating per‑value in the hot decode path. Right now malformed writer data will fail at flush time when we build the Arrow array. I could probably do a fail‑fast check at decode time via a feature flag as part of a follow-up PR though.
Super valid call out. I'd imagine rust follows the mathematical definition. Maybe worth cleaning up at some point. |
fe2e63e to
c555bee
Compare
|
@scovich Thanks for the solid follow-up! I just pushed a commit that addresses your comments. Also to answer those questions you had:
I thought about it more and realized adding the
I looked at the code again and didn't see a legitimate case where |
Fair! Arrow array build failure seems soon enough to detect what is hopefully a super uncommon case? |
It's pretty uncommon. I've only seen it with malformed data anecdotally. However having an optional fail fast path in these promotions is probably a good idea though. |
c555bee to
20d946f
Compare
|
CI failure unrelated to this PR |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jecsand838 and @scovich
I had a few suggestions, but the most important one I would really like to see is the "end to end" test that reads the data from union_fields.avro and verifies the resulting UnionArray directly
| assert_union_equivalent("record_with_union_field.u", u_b, u_r); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One test I think Would be super valuable would be a more "end to end" test showing the output of this logic as Arrow arrays.
Perhaps a test like
- read the file in
- programatically create the expected
UnionArray assert_eqthey are the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed up a full e2e test that compares Arrow arrays as well. This was a good call!
| "Promotion String->Bytes target mismatch".into(), | ||
| )), | ||
| }, | ||
| Promotion::BytesToString => match self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the call std::str::from_utf8(data) below validates that the input is valid utf8
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct DispatchLookupTable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would help me read this code if the indexing were explicitly described
Specifically, what does the index in to_reader and promotion represent? I think it is the output column index and the value is the input column index?
It would also be nice to document that some of the to_reader entries may be -1 and what that means
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added detailed comments for the DispatchLookupTable.
arrow-avro/src/reader/record.rs
Outdated
| "Negative union branch index {tag}" | ||
| ))); | ||
| } | ||
| Ok(tag as usize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it will matter, but I think long (64 bit signed) may not fit into a usize on 32-bit platforms
Another way to do this test that would catch it is something like
let tag: usize = buf.get_long().try_into()
.map_err(|_| {
ArrowError::ParseError(format!(ArrowError::ParseError(format!
"Negative union branch index {tag}"
))})?;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think union type tags are restricted to 0..=127?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it will matter, but I think long (64 bit signed) may not fit into a usize on 32-bit platforms
Another way to do this test that would catch it is something like
Great catch! ty for that. I'll switch to a fallible conversion, improve the error messages, and clean-up. This will cover both negative indices and (on 32‑bit) non‑negative indices that don’t fit in usize. The same fix will be applied in the skipper path.
I can refine this further in a follow-up PR as well if that's alright with you.
I think union type tags are restricted to 0..=127?
That 0..=127 bound applies to Arrow union type_ids (int8), not to Avro tags. In Avro, the tag is the zero‑based union branch index (encoded as an int in 1.11.1), so it’s only constrained by the number of union branches (it must be non‑negative and < branch count).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an issue here that could fail a valid Avro files.
In the (incredibly unlikely and never seen it before personally) scenario we get a reader asking us to materialize an Arrow union with >127 branches we'll have an error. That's more of a Arrow format limitation on the UnionFields as I understand it. Something to follow-up on at some point imo and I can update the docs in a fast follow PR as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... what's the largest legal union branch value that avro supports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to support super lage unions, I was just trying to make sure we had some reasonable error message if we don't support that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... what's the largest legal union branch value that avro supports?
In 1.11.x+ it's 32‑bit signed int, so 2,147,483,647. Older versions of the spec just defined it as long if I remember correctly. In practice, I've never seen unions get anywhere close to this size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed up some better checks in the cold path and better error messages.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
c1d9dfb to
26e8e96
Compare
@alamb Thank you for the solid feedback and suggestions. I pushed up changes that include the end to end test and address your comments. Let me know what you think!
That's correct. I just removed that again and will come back to improving the validation as a fast follow PR. For now invalid UTF8 will get caught in the flush. |
78181ee to
9adae00
Compare
9adae00 to
f3c97a0
Compare
|
Thanks @jecsand838, @nathaniel-d-ef and @scovich -- I am really excited to see how the arrow-avro crate is shaping up |
Which issue does this PR close?
This work continues arrow-avro schema resolution support and aligns behavior with the Avro spec.
RecordDecoder#8293 (Add projection with default values support to RecordDecoder), Add schema resolution and type promotion support to arrow-avro Decoder #8124 (schema resolution & type promotion for the decoder), Added arrow-avro enum mapping support for schema resolution #8223 (enum mapping for schema resolution). These previous efforts established the foundations that this PR extends to Union types and Union resolution.Rationale for this change
arrow-avrolacked end‑to‑end support for Avro unions and ArrowUnionschemas. Many Avro datasets rely on unions (i.e..,["null","string"], tagged unions of different records), and without schema‐level resolution and JSON encoding the crate could not interoperate cleanly. This PR complete the initial Decoder support for Union types and Union resolution.What changes are included in this PR?
Are these changes tested?
Yes,
New detailed end to end integration tests have been added to
reader/mod.rsand unit tests covering the new Union and Union resolution functionality are included in thereader/record.rsfile.Are there any user-facing changes?
N/A