Skip to content

Conversation

@pabloem
Copy link
Member

@pabloem pabloem commented Nov 2, 2021

This enables a path that relies on Auto-sharded GroupIntoBatches - this will help the JdbcIO transform scale with the size of a cluster in streaming, and hel with efficiency.

r: @aromanenko-dev
r: @chamikaramj


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@pabloem
Copy link
Member Author

pabloem commented Nov 2, 2021

Run Java PostCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Nov 2, 2021

Run Java PostCommit

@pabloem pabloem changed the title Autosharding jfor JdbcIO.write* transforms Autosharding for JdbcIO.write* transforms Nov 3, 2021
@pabloem pabloem changed the title Autosharding for JdbcIO.write* transforms [BEAM-13184] Autosharding for JdbcIO.write* transforms Nov 3, 2021
@pabloem pabloem force-pushed the autosharding-jdbcio-write branch from fbad9c9 to f23fe90 Compare November 3, 2021 21:33
@pabloem
Copy link
Member Author

pabloem commented Nov 3, 2021

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Nov 4, 2021

Run Java PreCommit

@pabloem
Copy link
Member Author

pabloem commented Nov 9, 2021

PTAL friends : )

@aromanenko-dev
Copy link
Contributor

@pabloem Sorry for delay, I was busy with some other things. I'll try to take a look in the next few days.

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I left my comments, ptal.

Should we add a test for this feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why of("") ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupIntoBatches needs to receive a PColl<KV<..>> to do the per-batch grouping and autosharding, so I assign any key for the elements to go into the GIB transform. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider moving this WithKeys.of() and the subsequent Values.create() into GroupIntoBatches to improve it's user interface (in a separate PR) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pabloem Ok. I'd suggest to add a quick comment on this here to make it clear.
@chamikaramj Makes sense for me.

@pabloem
Copy link
Member Author

pabloem commented Nov 12, 2021

adding a test makes sense. I'll work on that

@pabloem
Copy link
Member Author

pabloem commented Nov 12, 2021

Run Java PostCommit

@pabloem pabloem force-pushed the autosharding-jdbcio-write branch 2 times, most recently from 3ec4454 to c3a51da Compare November 12, 2021 23:43
@pabloem
Copy link
Member Author

pabloem commented Nov 12, 2021

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Nov 13, 2021

Run Java PreCommit

@pabloem
Copy link
Member Author

pabloem commented Nov 16, 2021

hi @aromanenko-dev : ) - I'd love to get this in for the release cut on wednesday (though don't feel pressured to allow bad quality code. if fixes are needed, I'll happily make them)

@pabloem pabloem force-pushed the autosharding-jdbcio-write branch from c3a51da to 6b80857 Compare November 29, 2021 19:48
@pabloem pabloem force-pushed the autosharding-jdbcio-write branch from 6b80857 to ab260c5 Compare November 29, 2021 22:25
Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

PCollection<Iterable<T>> iterables;
if (input.isBounded() == IsBounded.UNBOUNDED
&& getAutoSharding() != null
&& getAutoSharding()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"&& getAutoSharding()" is redundant ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it may return false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider moving this WithKeys.of() and the subsequent Values.create() into GroupIntoBatches to improve it's user interface (in a separate PR) ?

if (outputList == null) {
outputList = new ArrayList<>();
}
outputList.add(c.element());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we run into OOMs if this list grows too much ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. Added a maximum row limit

spec.getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
}
return input
PCollection<Iterable<T>> iterables;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we share code between WriteWithResult and WriteVoid ? This introduces a significant amount of code duplication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Added a helper method.

JdbcIO.<KV<Integer, String>>write()
.withDataSourceProviderFn(voidInput -> dataSource)
.withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
.withAutoSharding()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we actually able to test that auto-sharding worked somehow ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm not really the way things are now - perhaps analyze the graph and see that the GIB transform is in it - but is that worth it?

@aromanenko-dev
Copy link
Contributor

Hey @pabloem, kind ping on this PR

@pabloem
Copy link
Member Author

pabloem commented Dec 15, 2021

Run Java PreCommit

@pabloem
Copy link
Member Author

pabloem commented Dec 15, 2021

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Dec 16, 2021

Run Java PreCommit

@pabloem
Copy link
Member Author

pabloem commented Dec 16, 2021

Run Java PostCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Dec 20, 2021

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Dec 20, 2021

@chamikaramj @aromanenko-dev PTAL

@pabloem
Copy link
Member Author

pabloem commented Dec 20, 2021

Run Java PostCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Dec 20, 2021

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Dec 27, 2021

anyone got some time? : )

@aromanenko-dev
Copy link
Contributor

Thanks, it's ok for me but maybe @chamikaramj has some additional comments?

@chamikaramj
Copy link
Contributor

LGTM. Thanks.

@pabloem pabloem merged commit 818428f into apache:master Jan 4, 2022
@pabloem pabloem deleted the autosharding-jdbcio-write branch January 4, 2022 20:50
public void processElement(ProcessContext context) throws Exception {
T record = context.element();
records.add(record);
if (records.size() >= spec.getBatchSize()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pabloem Hi Pablo. Not sure if this is the right way to ask this, but I was wondering if this removal removes the configurability of batch size? In batchElements I see that it utilizes a constant of MAX_BUNDLE_SIZE. Seems like it may not be utilizing batchSize anymore but I may be missing or misunderstanding something though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I'll fix this before the release. (2.36.0), so it'll only be available on the next one

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Thank you!

pabloem added a commit to pabloem/beam that referenced this pull request Jan 19, 2022
…g for JdbcIO.write* transforms"

This reverts commit 818428f.
@steveniemitz
Copy link
Contributor

steveniemitz commented Jan 20, 2022

random musings from me, because we've tried to do something like this as well w/ our own SQL-ish IO.

If you introduce an (implicit) reshuffle between the producer of the rows being written and the writer, you'll possibly break an implicit contract that users have been relying on that mutations produced are applied in-order to the JDBC destination.

For example, if a GBK is triggering every 10 seconds and the next transform is a JdbcIO, by default that GBK trigger will fuse w/ the JdbcIO writer and apply "inline", so all triggers will apply in order. If you apply batching (with autosharding or not), multiple mutations for the same row may be grouped into multiple different batches, which will then be applied in a non-deterministic order. This can cause older firings to overwrite newer ones depending on the order they're applied in.

pabloem added a commit that referenced this pull request Jan 20, 2022
…BEAM-13184] Autosharding for …

Revert "Merge pull request #15863 from [BEAM-13184] Autosharding for …
pabloem added a commit to pabloem/beam that referenced this pull request Jan 20, 2022
…osharding for JdbcIO.write* transforms""

This reverts commit 421bc80.
kennknowles added a commit to kennknowles/beam that referenced this pull request Jan 21, 2022
* github/master: (198 commits)
  Merge pull request apache#16369 from [BEAM-13558] [Playground] Hide the Graph tab and SCIO from SDK options
  Merge pull request apache#16546 from [BEAM-13661] [BEAM-13704] [Playground] Update tags for examples/katas/unit-tests
  Merge pull request apache#16540 from [BEAM-13678][Playground]Update Github Action To Deploy Examples
  [BEAM-13430] Re-add provided configuration (apache#16552)
  [BEAM-10206] Remove Fatalf calls in non-test goroutines for tests/benchmarks (apache#16575)
  [BEAM-13693] Bump beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming timeout to 12 hours (apache#16576)
  [BEAM-13699] Replace fnv with maphash. (apache#16573)
  Update Java FnAPI beam master (apache#16572)
  Merge pull request apache#16371 from [BEAM-13518][Playground] Beam Playground quickstart page on the Beam website
  Merge pull request apache#16373 from [BEAM-13515] [Playground] Hiding lines in an example that are not necessary
  Merge pull request apache#16472: [BEAM-13697] Add SchemaFieldNumber annotation
  [BEAM-13689] Output token elements when BQ batch writes complete.
  Disable logging for memoization test. (apache#16556)
  BEAM-13611 reactivating jdbcio xlang test
  Revert "Merge pull request apache#15863 from [BEAM-13184] Autosharding for JdbcIO.write* transforms"
  Remove obsolete commands from Inventory job. (apache#16564)
  [BEAM-13688] fixed type in BPG 4.5.3 window section (apache#16560)
  [BEAM-13683] Make cross-language SQL example pipeline (apache#16567)
  Merge pull request apache#16243 from darshan-sj/feature/support-priority-spannerio - Making rpcPriority a ValueProvider in SpannerConfig
  edited README and comments in Python multi-lang pipes examples
  Merge pull request apache#16518 from [BEAM-13619] [Playground] Add loading animation to the catalog
  Merge pull request apache#16519 from [BEAM-13639] [Playground] Add notification to Log/Output tabs about cached example
  Merge pull request apache#16533 from [BEAM-13548] [Playground] Add example description popover
  Merge pull request apache#16531 from [BEAM-13567] [playground] Handle run code validation and preparation errors
  Merge pull request apache#16370 from [BEAM-13556] playground - color and scroll tabs with new content
  [BEAM-13611] Skip test_xlang_jdbc_write (apache#16554)
  [BEAM-13015] Provide caching statistics in the status client. (apache#16495)
  Merge pull request apache#16309: [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor
  [BEAM-13015] Add state caching capability to be used as hint for runners to not duplicate cached data if the SDK can do it for user state and side inputs. (apache#16525)
  [BEAM-13665] Make SpannerIO projectID optional again (apache#16547)
  Merge pull request apache#16322 from [BEAM-13407] [Playground] Preload fonts for the web application
  Merge pull request apache#16506 from [BEAM-13652][Playground] Send examples' links to the frontend
  [BEAM-11808][BEAM-9879] Support aggregate functions with two arguments (apache#16200)
  Update walkthrough.md (apache#16512)
  [BEAM-13683] Correct SQL transform schema, fix expansion address override bug (apache#16551)
  Merge pull request apache#16486 from [BEAM-13544][Playground] Add logs to examples CI/CD to see the progress
  [BEAM-13616][BEAM-13646] Upgrade vendored calcite to 1.28.0:0.2 (apache#16544)
  [BEAM-13616][BEAM-13645] Switch to vendored grpc 1.43.2 (apache#16543)
  Also bump FnAPI container.
  [BEAM-13680] Fixed code_repository (added pipelineUuid to RunCodeResult when status is "Finished")
  [BEAM-13616] Update com.google.cloud:libraries-bom to 24.2.0 (apache#16509)
  [BEAM-13430] Remove jcenter which will no longer contain any updates. (apache#16536)
  Update GH Actions to use proper variables names and proper triggers
  Remove jcenter repositories from gradle configuration. (apache#16532)
  Merge pull request apache#16507: [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0
  [BEAM-13664] Fix Primitives hashing benchmark (apache#16523)
  Bump beam container version.
  [BEAM-12621] - Update Jenkins VMs to modern Ubuntu version (apache#16457)
  doc tweaks (apache#16498)
  Redirecting cross-language transforms content (apache#16504)
  Remove tab from source.
  Remove unnecessary fmt call in universal.go
  Clean up string cast of bytes in vet.go and corresponding tests
  fix capitalized error strings in expansionx
  Remove unnecessary blank identifier assignment in harness.go
  Replace string(buf.Bytes()) with buf.String() in coder_test.go
  Replace bytes.Compare() with bytes.Equal() in test cases
  Remove unnecessary fmt.Sprintf() in partition.go
  Fix staticcheck errors in transforms directory
  [BEAM-13590] Fix  abc imports from collections (apache#15850)
  Merge pull request apache#16482 from [BEAM-13429][Playground] Add builder for preparers
  [BEAM-10206] Resolve go vet errors in protox package
  [BEAM-12572] Run java examples on multiple runners (apache#16450)
  [BEAM-13400] JDBC IO does not support UUID and JSONB PostgreSQL types and OTHER JDBC types in general
  [BEAM-13577] Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema
  [BEAM-12164]: Add SDF for reading change stream records
  [BEAM-13455] Remove duplicated artifacts when using multiple environments with Dataflow Java
  Merge pull request apache#16485 from [BEAM-13486] [Playground] For unit tests (java) if one of tests fails the output goes to stdOutput
  Merge pull request apache#16385 from [BEAM-13535] [Playground] add cancel execution button
  [BEAM-12558] Fix doc typo.
  Merge pull request apache#16467 from [BEAM-12164]: SpannerIO DetectNewPartitions SDF
  Introduce the notion of a JoinIndex for fewer shuffles. (apache#16101)
  [BEAM-12464] Change ProtoSchemaTranslator beam schema creation to match the order for protobufs containing Oneof fields (apache#14974)
  Stronger typing inference for CoGBK. (apache#16465)
  [BEAM-13480] Increase pipeline timeout for PubSubIntegrationTest.test_streaming_data_only (apache#16496)
  Provide API to check whether a hint is known.
  [BEAM-8806] Integration test for SqsIO using Localstack
  Split builder into several builder for each step of pipeline execution
  [BEAM-13399] Move service liveness polling to Runner type (apache#16487)
  Merge pull request apache#16325 from [BEAM-13471] [Playground] Tag existing unit-tests
  Adds several example multi-language Python pipelines
  [BEAM-13616][BEAM-13646] Update vendored calcite 1.28.0 with protobuf 3.19.2 (apache#16473)
  Merge pull request apache#16374 from [BEAM-13398][Playground] Split LifeCycle to DTO and business logic
  Merge pull request apache#16363 from [BEAM-13557] [Playground] show code execution time
  Merge pull request apache#16149 from [BEAM-13113] [Playground] playground frontend documentation
  Merge pull request apache#16469 from [BEAM-13623][Playground] [Bugfix] During unit tests failing there is no any output
  [BEAM-13641][Playground] Add SCIO SDK support on the CI/CD step
  [BEAM-13638] Datatype of timestamp fields in SqsMessage for AWS IOs for SDK v2 was changed from String to long, visibility of all fields was fixed from package private to public
  [BEAM-13616] Initial files for vendored gRPC 1.43.2 (apache#16460)
  [BEAM-13432] Skip ExpansionService creation in Job Server (apache#16222)
  [BEAM-13628] Update SideInputCache to use full Transform and SideInputIDs as token information (apache#16483)
  [BEAM-13631] Add deterministic SQS message coder to fix reading from SQS in batch mode
  [BEAM-8806] Integration test for SqsIO
  [adhoc] Run spotlessApply on java examples to fix master
  Merge pull request apache#16156 from [BEAM-13391] Fix temporary file format in WriteToBigQuery
  Optional args and kwargs for named external transforms.
  [BEAM-13614] Add OnWindowExpiration support to the Java SDK harness and proto translation. (apache#16458)
  [BEAM-3221] Improve documentation in model pipeline protos (apache#16474)
  Merge pull request apache#16147 from [BEAM-13359] [Playground] Tag existing examples
  [BEAM-13626] Remap expanded outputs after merging. (apache#16471)
  ...
pabloem added a commit that referenced this pull request Jan 28, 2022
 from [BEAM-13184] Autoshard…

* Revert "Revert "Merge pull request #15863 from [BEAM-13184] Autosharding for JdbcIO.write* transforms""

This reverts commit 421bc80.

* Using batchSize to define element batch size

* Handle corner case for null list
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants