Skip to content

Managed + Iceberg IO#5494

Merged
kellen merged 16 commits intov0.15.xfrom
kellen/iceberg
Feb 13, 2025
Merged

Managed + Iceberg IO#5494
kellen merged 16 commits intov0.15.xfrom
kellen/iceberg

Conversation

@kellen
Copy link
Copy Markdown
Contributor

@kellen kellen commented Sep 17, 2024

Adds support for Beam's managed transforms and for Iceberg, which is implemented as a managed transform.

Note this is on a snapshot of magnolify, will need a release before merge.

@codecov
Copy link
Copy Markdown

codecov bot commented Sep 17, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 61.32%. Comparing base (a464955) to head (d3b67dc).
Report is 27 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5494      +/-   ##
==========================================
- Coverage   61.32%   61.32%   -0.01%     
==========================================
  Files         312      312              
  Lines       11080    11082       +2     
  Branches      770      728      -42     
==========================================
+ Hits         6795     6796       +1     
- Misses       4285     4286       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Copy Markdown
Contributor

@RustedBones RustedBones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the new module in the README ?

Comment on lines +69 to +72
new Schema(
NestedField.required(0, "a", IntegerType.get()),
NestedField.required(1, "b", StringType.get())
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be given by the RowType ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an Iceberg schema rather than the Beam schema. The lack of create-on-write does raise the question of whether we also need to derive the iceberg schemas

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe RowType could also offer a def icebergSchema? (Similar to how magnolify-parquet has both def schema and def avroSchema...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean pulling in iceberg deps into the beam module fyi

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could make it provided, I guess, but point taken

Seems like Beam should have a utility function for converting between Beam/Icerberg Schemas. They have similar stuff for BQ/Avro/BeamSchema interop. Maybe we could contribute there

Copy link
Copy Markdown
Contributor Author

@kellen kellen Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beam does have this but it introduces a dep on the iceberg part of the sdk that in theory should be managed.

I could use it in the integration test directly but that wouldn't help users at all.

IcebergUtils.beamSchemaToIcebergSchema(rowType.schema)

OTOH ... the class has this comment so 🤷

  // This is made public for users convenience, as many may have more experience working with
  // Iceberg types.

Comment on lines +32 to +45
private lazy val _config: java.util.Map[String, Object] = {
// recursively convert this yaml-compatible nested scala map to java map
// we either do this or the user has to create nested java maps in scala code
// both are bad
def _convert(a: Object): Object = {
a match {
case m: Map[_, _] =>
m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava
case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava
case _ => a
}
}
config.map { case (k, v) => k -> _convert(v) }.asJava
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should introduce either a config AST to ensure what is passed is YAML compatible, or maybe use lightbend config

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API can aslo take a yaml file location, e.g. classpath://foo.yaml if we wanted to support that.

@RustedBones RustedBones marked this pull request as draft September 18, 2024 08:42
@RustedBones
Copy link
Copy Markdown
Contributor

Converted to draft until magnolify gets released

Copy link
Copy Markdown
Contributor

@clairemcginty clairemcginty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good to me! the amount of testing+examples are impressive

import org.apache.beam.sdk.values.{PCollectionRowTuple, Row}
import scala.jdk.CollectionConverters._

final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ScioIO[Row] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess once this is merged, we can add a ManagedTypedIO to the 0.15.x branch? maybe let's file a ticket to track the Magnolify API work...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to support managed very extensively, so I decided to just skip any typed variant for Managed in favor of doing it downstream in our io-specific APIs

…cala

Co-authored-by: Claire McGinty <clairem@spotify.com>
@kellen kellen marked this pull request as ready for review February 13, 2025 14:33
@kellen kellen changed the base branch from main to v0.15.x February 13, 2025 14:33
@kellen kellen merged commit acaf383 into v0.15.x Feb 13, 2025
1 check passed
@kellen kellen deleted the kellen/iceberg branch February 13, 2025 15:40
kellen added a commit that referenced this pull request Feb 14, 2025
kellen added a commit that referenced this pull request Feb 14, 2025
kellen added a commit that referenced this pull request Feb 26, 2025
clairemcginty pushed a commit that referenced this pull request Aug 13, 2025
clairemcginty pushed a commit that referenced this pull request Aug 13, 2025
kellen added a commit that referenced this pull request Nov 17, 2025
kellen added a commit that referenced this pull request Dec 8, 2025
kellen added a commit that referenced this pull request Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants