Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #5494 +/- ##
==========================================
- Coverage 61.32% 61.32% -0.01%
==========================================
Files 312 312
Lines 11080 11082 +2
Branches 770 728 -42
==========================================
+ Hits 6795 6796 +1
- Misses 4285 4286 +1 ☔ View full report in Codecov by Sentry. |
RustedBones
left a comment
There was a problem hiding this comment.
Can you add the new module in the README ?
| new Schema( | ||
| NestedField.required(0, "a", IntegerType.get()), | ||
| NestedField.required(1, "b", StringType.get()) | ||
| ), |
There was a problem hiding this comment.
Can't this be given by the RowType ?
There was a problem hiding this comment.
No, this is an Iceberg schema rather than the Beam schema. The lack of create-on-write does raise the question of whether we also need to derive the iceberg schemas
There was a problem hiding this comment.
Maybe RowType could also offer a def icebergSchema? (Similar to how magnolify-parquet has both def schema and def avroSchema...)
There was a problem hiding this comment.
That would mean pulling in iceberg deps into the beam module fyi
There was a problem hiding this comment.
could make it provided, I guess, but point taken
Seems like Beam should have a utility function for converting between Beam/Icerberg Schemas. They have similar stuff for BQ/Avro/BeamSchema interop. Maybe we could contribute there
There was a problem hiding this comment.
Beam does have this but it introduces a dep on the iceberg part of the sdk that in theory should be managed.
I could use it in the integration test directly but that wouldn't help users at all.
IcebergUtils.beamSchemaToIcebergSchema(rowType.schema)
OTOH ... the class has this comment so 🤷
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
| private lazy val _config: java.util.Map[String, Object] = { | ||
| // recursively convert this yaml-compatible nested scala map to java map | ||
| // we either do this or the user has to create nested java maps in scala code | ||
| // both are bad | ||
| def _convert(a: Object): Object = { | ||
| a match { | ||
| case m: Map[_, _] => | ||
| m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava | ||
| case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava | ||
| case _ => a | ||
| } | ||
| } | ||
| config.map { case (k, v) => k -> _convert(v) }.asJava | ||
| } |
There was a problem hiding this comment.
I'm wondering if we should introduce either a config AST to ensure what is passed is YAML compatible, or maybe use lightbend config
There was a problem hiding this comment.
The API can aslo take a yaml file location, e.g. classpath://foo.yaml if we wanted to support that.
scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
|
Converted to draft until |
clairemcginty
left a comment
There was a problem hiding this comment.
overall looks good to me! the amount of testing+examples are impressive
scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala
Outdated
Show resolved
Hide resolved
| import org.apache.beam.sdk.values.{PCollectionRowTuple, Row} | ||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
| final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ScioIO[Row] { |
There was a problem hiding this comment.
i guess once this is merged, we can add a ManagedTypedIO to the 0.15.x branch? maybe let's file a ticket to track the Magnolify API work...
There was a problem hiding this comment.
I don't think we need to support managed very extensively, so I decided to just skip any typed variant for Managed in favor of doing it downstream in our io-specific APIs
…cala Co-authored-by: Claire McGinty <clairem@spotify.com>
Adds support for Beam's managed transforms and for Iceberg, which is implemented as a managed transform.
Note this is on a snapshot of magnolify, will need a release before merge.