Support writing to BQ via Storage Write API with SchemaTransforms#23988
Support writing to BQ via Storage Write API with SchemaTransforms#23988pabloem merged 16 commits intoapache:masterfrom
Conversation
acde5b7 to
7f41639
Compare
6e509be to
8177bca
Compare
|
R: @johnjcasey |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
613c463 to
633ef7d
Compare
633ef7d to
8aa51d4
Compare
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
| public abstract Builder setNumStorageWriteApiStreams(Integer numStorageWriteApiStreams); | ||
|
|
||
| public abstract Builder setNumFileShards(Integer numFileShards); |
There was a problem hiding this comment.
I think we should have default values for these (file shards are not relevant for storage, right?) - and avoid exposing them if possible. In fact, we should use auto sharding if it's supported.
There was a problem hiding this comment.
will do, that sounds better. fyi support was added in #16795
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
| if (configuration.getTriggeringFrequencySeconds() != null) { | ||
| write = | ||
| write.withTriggeringFrequency( | ||
| Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); | ||
| } |
There was a problem hiding this comment.
I think we should probably have a default value for this. Something like:
if input.isUnbounded():
write = write.withTriggeringFrequency(config.getTriggeringFrequency() == null ? DEFAULT_VALUE : conf.getTriggeringFrequency())
so that this parameter can be optional even for streaming pipelines
There was a problem hiding this comment.
SGTM, what default do you think is good here? I've often seen 60s
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
|
retest this please |
|
Thank you @pabloem, PTAL |
|
Friendly ping :) needs a final review |
|
R: @johnjcasey |
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...pache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
Show resolved
Hide resolved
|
Comments addressed, PTAL :) |
|
lgtm thanks! |
…ache#23988) * storage write api provider class skeleton * storage write schematransform configuration with validation * small fixes, more validation checks, and setting up the storage write transform * PCollectionRowTuple with two outputs: failed rows and (errorrors+failed rows) * spotless * bq services does not belong as a configuration field * add AT_LEAST_ONCE semantics. also set test bq services * test config validation and successful writes * test for failed rows * test for failed rows * some fixes * experiment * use autoSharding, set default triggering frequency, use beameam schema * use Long instead of Integer * address comments: use autosharding, 5s commit interval, change input and output tags
Adding SchemaTransform capability to BigQueryIO Storage Write API.