Skip to content

Conversation

@robertwb
Copy link
Contributor

@robertwb robertwb commented Dec 1, 2021

This allow joining (aka zipping) operations to execute without requiring a global repartitioning as long as the operands have a common, unchanged ancestor index.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@robertwb
Copy link
Contributor Author

robertwb commented Dec 1, 2021

R: @TheNeuralBit

@codecov
Copy link

codecov bot commented Dec 2, 2021

Codecov Report

Merging #16101 (d76c748) into master (468089c) will decrease coverage by 0.00%.
The diff coverage is 95.38%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #16101      +/-   ##
==========================================
- Coverage   83.63%   83.63%   -0.01%     
==========================================
  Files         445      450       +5     
  Lines       61428    61911     +483     
==========================================
+ Hits        51374    51777     +403     
- Misses      10054    10134      +80     
Impacted Files Coverage Δ
sdks/python/apache_beam/dataframe/expressions.py 92.90% <ø> (ø)
sdks/python/apache_beam/dataframe/frames.py 94.90% <ø> (+0.02%) ⬆️
...pache_beam/runners/portability/portable_metrics.py 89.65% <ø> (-10.35%) ⬇️
sdks/python/apache_beam/dataframe/partitionings.py 94.53% <89.28%> (-1.51%) ⬇️
sdks/python/apache_beam/dataframe/frame_base.py 90.37% <100.00%> (ø)
sdks/python/apache_beam/dataframe/transforms.py 95.25% <100.00%> (+0.33%) ⬆️
sdks/python/apache_beam/metrics/metric.py 95.38% <100.00%> (ø)
...eam/runners/portability/fn_api_runner/fn_runner.py 90.90% <100.00%> (+0.10%) ⬆️
sdks/python/apache_beam/internal/pickler.py 77.27% <0.00%> (-9.59%) ⬇️
... and 43 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 468089c...d76c748. Read the comment docs.

This allow joining (aka zipping) operations to execute
without requiring a global repartitioning as long as the
operands have a common, unchanged ancestor index.
Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one quick high-level question. I'll need a little time to grok the expand() changes.

return p.result.metrics().query(
metrics.MetricsFilter().with_name(
fn_runner.FnApiRunner.NUM_FUSED_STAGES_COUNTER)
)['counters'][0].result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little odd to verify this through a metric in FnApiRunner. Could we instrument DataFrameTransform expansion isntead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to ideas here. At the end of the day, I'm wanting to assert that things get sufficient fused (possibly catching issues with side inputs as well as shuffles).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thought I had was just traversing the pipeline and counting CoGBKs, but you're right that wouldn't help if side inputs mess up fusion.

Could we pull out the fusion logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fusion logic is not very easy to invoke in isolation.

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I have a few nits and some clarifying questions

# If expr does not require partitioning, just grab any stage, else grab
# the first stage where all of expr's inputs are partitioned as required.
# In either case, use the first such stage because earlier stages are
# closer to the inputs (have fewer intermediate stages).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on removing this comment, it looks like it wasn't updated when the logic was clarified.

inputs_by_stage[stage] += 1 + 100 * (
expr.requires_partition_by() == stage.partitioning)
if inputs_by_stage:
stage = sorted(inputs_by_stage.items(), key=lambda kv: kv[1])[-1][0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think max would work here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max doesn't let you specify a key. I suppose could swap the tuple ordering (but then I'd have to worry about stages being comparable in case the counts were equal). Added a comment 'cause that line is a mouthful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Done.

return True

def test_partition_fn(self, df):
return Index().test_partition_fn(df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this could replicate JoinIndex partitioning somehow. Any thoughts on how we could do that?

I suppose we could use a slightly modified hash function to generate a different partitioning. I guess that doesn't get us much, though... it just verifies that we're not overfitting for Index's hashing technique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some thinking about this and the problem is that JoinIndex is not a well defined partitioning in isolation, it's only meaningful when we start talking about whether things have common ancestry within a stage. (E.g. across stage boundaries, we upgrade JoinIndex to Index.) That does unfortunately mean that our non-pipeline tests aren't going to catch as many issues here (which makes sense, as the interesting logic to deal with join indices is in the translation to beam steps code).

if arg not in inputs]):
if is_computable_in_stage(expr, stage):
break
if all(arg in inputs for arg in expr.args()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why we need separate logic for the all input argument case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any non-inputs arguments, we must pick (or create) a stage that has all of them. (Well, technically one can sometimes add expressions to an existing stage, but this requires graph analysis to ensure one is not creating cycles.) That's the code that was there below (and before). However, if there aren't any non-input arguments, that code won't find any candidates and we'll create a new stage for each root operation, limiting our ability to find stages that have common ancestors (even if they would have fused in the Beam optimizer).

Maybe an example is more clear. If we have

`df.A + df.B`

we need to compute df.A somewhere, so we create a stage (call it stage1) with an input of df and a single operation. Now when we compute df.B, we'd rather re-use stage1 so that we can find a single stage that contains both df.A and df.B. This wasn't important before as we always introduced a shuffle for multi-input operations so close to the roots anyway.

for stage in expr_to_stages(arg):
if is_computable_in_stage(expr, stage):
inputs_by_stage[stage] += 1 + 100 * (
expr.requires_partition_by() == stage.partitioning)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 100:1 ratio arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poor man's lexicographical ordering. I could add a couple more zeros. (I don't know what the right ratio actually should be if we wanted to make the comparable, mostly this is to prefer the most distributed option.)

Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments!

for stage in expr_to_stages(arg):
if is_computable_in_stage(expr, stage):
inputs_by_stage[stage] += 1 + 100 * (
expr.requires_partition_by() == stage.partitioning)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poor man's lexicographical ordering. I could add a couple more zeros. (I don't know what the right ratio actually should be if we wanted to make the comparable, mostly this is to prefer the most distributed option.)

if arg not in inputs]):
if is_computable_in_stage(expr, stage):
break
if all(arg in inputs for arg in expr.args()):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any non-inputs arguments, we must pick (or create) a stage that has all of them. (Well, technically one can sometimes add expressions to an existing stage, but this requires graph analysis to ensure one is not creating cycles.) That's the code that was there below (and before). However, if there aren't any non-input arguments, that code won't find any candidates and we'll create a new stage for each root operation, limiting our ability to find stages that have common ancestors (even if they would have fused in the Beam optimizer).

Maybe an example is more clear. If we have

`df.A + df.B`

we need to compute df.A somewhere, so we create a stage (call it stage1) with an input of df and a single operation. Now when we compute df.B, we'd rather re-use stage1 so that we can find a single stage that contains both df.A and df.B. This wasn't important before as we always introduced a shuffle for multi-input operations so close to the roots anyway.

inputs_by_stage[stage] += 1 + 100 * (
expr.requires_partition_by() == stage.partitioning)
if inputs_by_stage:
stage = sorted(inputs_by_stage.items(), key=lambda kv: kv[1])[-1][0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max doesn't let you specify a key. I suppose could swap the tuple ordering (but then I'd have to worry about stages being comparable in case the counts were equal). Added a comment 'cause that line is a mouthful.

return p.result.metrics().query(
metrics.MetricsFilter().with_name(
fn_runner.FnApiRunner.NUM_FUSED_STAGES_COUNTER)
)['counters'][0].result
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fusion logic is not very easy to invoke in isolation.

return True

def test_partition_fn(self, df):
return Index().test_partition_fn(df)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some thinking about this and the problem is that JoinIndex is not a well defined partitioning in isolation, it's only meaningful when we start talking about whether things have common ancestry within a stage. (E.g. across stage boundaries, we upgrade JoinIndex to Index.) That does unfortunately mean that our non-pipeline tests aren't going to catch as many issues here (which makes sense, as the interesting logic to deal with join indices is in the translation to beam steps code).

@aaltay
Copy link
Member

aaltay commented Dec 29, 2021

What is the next step on this PR?

Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I've addressed all the comments, PTAL.

@TheNeuralBit TheNeuralBit self-requested a review January 4, 2022 17:54
@aaltay
Copy link
Member

aaltay commented Jan 13, 2022

@TheNeuralBit could you please review the last round of changes?

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you.

Sorry for taking so long on this, appreciate your patience :)

@robertwb robertwb merged commit ad7a350 into apache:master Jan 13, 2022
kennknowles added a commit to kennknowles/beam that referenced this pull request Jan 21, 2022
* github/master: (198 commits)
  Merge pull request apache#16369 from [BEAM-13558] [Playground] Hide the Graph tab and SCIO from SDK options
  Merge pull request apache#16546 from [BEAM-13661] [BEAM-13704] [Playground] Update tags for examples/katas/unit-tests
  Merge pull request apache#16540 from [BEAM-13678][Playground]Update Github Action To Deploy Examples
  [BEAM-13430] Re-add provided configuration (apache#16552)
  [BEAM-10206] Remove Fatalf calls in non-test goroutines for tests/benchmarks (apache#16575)
  [BEAM-13693] Bump beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming timeout to 12 hours (apache#16576)
  [BEAM-13699] Replace fnv with maphash. (apache#16573)
  Update Java FnAPI beam master (apache#16572)
  Merge pull request apache#16371 from [BEAM-13518][Playground] Beam Playground quickstart page on the Beam website
  Merge pull request apache#16373 from [BEAM-13515] [Playground] Hiding lines in an example that are not necessary
  Merge pull request apache#16472: [BEAM-13697] Add SchemaFieldNumber annotation
  [BEAM-13689] Output token elements when BQ batch writes complete.
  Disable logging for memoization test. (apache#16556)
  BEAM-13611 reactivating jdbcio xlang test
  Revert "Merge pull request apache#15863 from [BEAM-13184] Autosharding for JdbcIO.write* transforms"
  Remove obsolete commands from Inventory job. (apache#16564)
  [BEAM-13688] fixed type in BPG 4.5.3 window section (apache#16560)
  [BEAM-13683] Make cross-language SQL example pipeline (apache#16567)
  Merge pull request apache#16243 from darshan-sj/feature/support-priority-spannerio - Making rpcPriority a ValueProvider in SpannerConfig
  edited README and comments in Python multi-lang pipes examples
  Merge pull request apache#16518 from [BEAM-13619] [Playground] Add loading animation to the catalog
  Merge pull request apache#16519 from [BEAM-13639] [Playground] Add notification to Log/Output tabs about cached example
  Merge pull request apache#16533 from [BEAM-13548] [Playground] Add example description popover
  Merge pull request apache#16531 from [BEAM-13567] [playground] Handle run code validation and preparation errors
  Merge pull request apache#16370 from [BEAM-13556] playground - color and scroll tabs with new content
  [BEAM-13611] Skip test_xlang_jdbc_write (apache#16554)
  [BEAM-13015] Provide caching statistics in the status client. (apache#16495)
  Merge pull request apache#16309: [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor
  [BEAM-13015] Add state caching capability to be used as hint for runners to not duplicate cached data if the SDK can do it for user state and side inputs. (apache#16525)
  [BEAM-13665] Make SpannerIO projectID optional again (apache#16547)
  Merge pull request apache#16322 from [BEAM-13407] [Playground] Preload fonts for the web application
  Merge pull request apache#16506 from [BEAM-13652][Playground] Send examples' links to the frontend
  [BEAM-11808][BEAM-9879] Support aggregate functions with two arguments (apache#16200)
  Update walkthrough.md (apache#16512)
  [BEAM-13683] Correct SQL transform schema, fix expansion address override bug (apache#16551)
  Merge pull request apache#16486 from [BEAM-13544][Playground] Add logs to examples CI/CD to see the progress
  [BEAM-13616][BEAM-13646] Upgrade vendored calcite to 1.28.0:0.2 (apache#16544)
  [BEAM-13616][BEAM-13645] Switch to vendored grpc 1.43.2 (apache#16543)
  Also bump FnAPI container.
  [BEAM-13680] Fixed code_repository (added pipelineUuid to RunCodeResult when status is "Finished")
  [BEAM-13616] Update com.google.cloud:libraries-bom to 24.2.0 (apache#16509)
  [BEAM-13430] Remove jcenter which will no longer contain any updates. (apache#16536)
  Update GH Actions to use proper variables names and proper triggers
  Remove jcenter repositories from gradle configuration. (apache#16532)
  Merge pull request apache#16507: [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0
  [BEAM-13664] Fix Primitives hashing benchmark (apache#16523)
  Bump beam container version.
  [BEAM-12621] - Update Jenkins VMs to modern Ubuntu version (apache#16457)
  doc tweaks (apache#16498)
  Redirecting cross-language transforms content (apache#16504)
  Remove tab from source.
  Remove unnecessary fmt call in universal.go
  Clean up string cast of bytes in vet.go and corresponding tests
  fix capitalized error strings in expansionx
  Remove unnecessary blank identifier assignment in harness.go
  Replace string(buf.Bytes()) with buf.String() in coder_test.go
  Replace bytes.Compare() with bytes.Equal() in test cases
  Remove unnecessary fmt.Sprintf() in partition.go
  Fix staticcheck errors in transforms directory
  [BEAM-13590] Fix  abc imports from collections (apache#15850)
  Merge pull request apache#16482 from [BEAM-13429][Playground] Add builder for preparers
  [BEAM-10206] Resolve go vet errors in protox package
  [BEAM-12572] Run java examples on multiple runners (apache#16450)
  [BEAM-13400] JDBC IO does not support UUID and JSONB PostgreSQL types and OTHER JDBC types in general
  [BEAM-13577] Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema
  [BEAM-12164]: Add SDF for reading change stream records
  [BEAM-13455] Remove duplicated artifacts when using multiple environments with Dataflow Java
  Merge pull request apache#16485 from [BEAM-13486] [Playground] For unit tests (java) if one of tests fails the output goes to stdOutput
  Merge pull request apache#16385 from [BEAM-13535] [Playground] add cancel execution button
  [BEAM-12558] Fix doc typo.
  Merge pull request apache#16467 from [BEAM-12164]: SpannerIO DetectNewPartitions SDF
  Introduce the notion of a JoinIndex for fewer shuffles. (apache#16101)
  [BEAM-12464] Change ProtoSchemaTranslator beam schema creation to match the order for protobufs containing Oneof fields (apache#14974)
  Stronger typing inference for CoGBK. (apache#16465)
  [BEAM-13480] Increase pipeline timeout for PubSubIntegrationTest.test_streaming_data_only (apache#16496)
  Provide API to check whether a hint is known.
  [BEAM-8806] Integration test for SqsIO using Localstack
  Split builder into several builder for each step of pipeline execution
  [BEAM-13399] Move service liveness polling to Runner type (apache#16487)
  Merge pull request apache#16325 from [BEAM-13471] [Playground] Tag existing unit-tests
  Adds several example multi-language Python pipelines
  [BEAM-13616][BEAM-13646] Update vendored calcite 1.28.0 with protobuf 3.19.2 (apache#16473)
  Merge pull request apache#16374 from [BEAM-13398][Playground] Split LifeCycle to DTO and business logic
  Merge pull request apache#16363 from [BEAM-13557] [Playground] show code execution time
  Merge pull request apache#16149 from [BEAM-13113] [Playground] playground frontend documentation
  Merge pull request apache#16469 from [BEAM-13623][Playground] [Bugfix] During unit tests failing there is no any output
  [BEAM-13641][Playground] Add SCIO SDK support on the CI/CD step
  [BEAM-13638] Datatype of timestamp fields in SqsMessage for AWS IOs for SDK v2 was changed from String to long, visibility of all fields was fixed from package private to public
  [BEAM-13616] Initial files for vendored gRPC 1.43.2 (apache#16460)
  [BEAM-13432] Skip ExpansionService creation in Job Server (apache#16222)
  [BEAM-13628] Update SideInputCache to use full Transform and SideInputIDs as token information (apache#16483)
  [BEAM-13631] Add deterministic SQS message coder to fix reading from SQS in batch mode
  [BEAM-8806] Integration test for SqsIO
  [adhoc] Run spotlessApply on java examples to fix master
  Merge pull request apache#16156 from [BEAM-13391] Fix temporary file format in WriteToBigQuery
  Optional args and kwargs for named external transforms.
  [BEAM-13614] Add OnWindowExpiration support to the Java SDK harness and proto translation. (apache#16458)
  [BEAM-3221] Improve documentation in model pipeline protos (apache#16474)
  Merge pull request apache#16147 from [BEAM-13359] [Playground] Tag existing examples
  [BEAM-13626] Remap expanded outputs after merging. (apache#16471)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants