Skip to content

Commit 426dbd3

Browse files
authored
Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)" (#28613)
This reverts commit 04a26da.
1 parent 04a26da commit 426dbd3

File tree

6 files changed

+24
-66
lines changed

6 files changed

+24
-66
lines changed

.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
2828
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
2929
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
3030
'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
31-
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')
31+
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.')
3232

3333

3434
// Set common parameters.

.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
2828
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
2929
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
3030
'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) {
31-
description('Runs end-to-end cross language GCP IO tests on the Direct runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')
31+
description('Runs end-to-end cross language GCP IO tests on the Direct runner.')
3232

3333
// Set common parameters.
3434
commonJobProperties.setTopLevelMainJobProperties(delegate)

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,6 @@ public void validate() {
176176
!Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
177177
invalidConfigMessage + "Output must not be empty if error handling specified.");
178178
}
179-
180-
if (this.getAutoSharding() != null && this.getAutoSharding()) {
181-
checkArgument(
182-
this.getNumStreams() == 0,
183-
invalidConfigMessage
184-
+ "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
185-
}
186179
}
187180

188181
/**
@@ -225,17 +218,11 @@ public static Builder builder() {
225218
public abstract Boolean getUseAtLeastOnceSemantics();
226219

227220
@SchemaFieldDescription(
228-
"This option enables using a dynamically determined number of Storage Write API streams to write to "
221+
"This option enables using a dynamically determined number of shards to write to "
229222
+ "BigQuery. Only applicable to unbounded data.")
230223
@Nullable
231224
public abstract Boolean getAutoSharding();
232225

233-
@SchemaFieldDescription(
234-
"If set, the Storage API sink will default to using this number of write streams. " +
235-
"Only applicable to unbounded data.")
236-
@Nullable
237-
public abstract Integer getNumStreams();
238-
239226
@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
240227
@Nullable
241228
public abstract ErrorHandling getErrorHandling();
@@ -256,8 +243,6 @@ public abstract static class Builder {
256243

257244
public abstract Builder setAutoSharding(Boolean autoSharding);
258245

259-
public abstract Builder setNumStreams(Integer numStreams);
260-
261246
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
262247

263248
/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
@@ -336,19 +321,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
336321
if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
337322
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
338323
Boolean autoSharding = configuration.getAutoSharding();
339-
Integer numStreams = configuration.getNumStreams();
340-
// Triggering frequency is only applicable for exactly-once
341-
if (!configuration.getUseAtLeastOnceSemantics()) {
342-
write =
343-
write.withTriggeringFrequency(
344-
(triggeringFrequency == null || triggeringFrequency <= 0)
345-
? DEFAULT_TRIGGERING_FREQUENCY
346-
: Duration.standardSeconds(triggeringFrequency));
347-
}
348-
// set num streams if specified, otherwise default to autoSharding
349-
if (numStreams > 0) {
350-
write = write.withNumStorageWriteApiStreams(numStreams);
351-
} else if (autoSharding == null || autoSharding) {
324+
write =
325+
write.withTriggeringFrequency(
326+
(triggeringFrequency == null || triggeringFrequency <= 0)
327+
? DEFAULT_TRIGGERING_FREQUENCY
328+
: Duration.standardSeconds(triggeringFrequency));
329+
// use default value true for autoSharding if not configured for STORAGE_WRITE_API
330+
if (autoSharding == null || autoSharding) {
352331
write = write.withAutoSharding();
353332
}
354333
}

sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@
3030
from hamcrest.core import assert_that as hamcrest_assert
3131

3232
import apache_beam as beam
33+
from apache_beam.io.external.generate_sequence import GenerateSequence
3334
from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery
3435
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
3536
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
3637
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
37-
from apache_beam.options.pipeline_options import PipelineOptions
3838
from apache_beam.testing.test_pipeline import TestPipeline
39-
from apache_beam.transforms.periodicsequence import PeriodicImpulse
4039
from apache_beam.utils.timestamp import Timestamp
4140

4241
# Protect against environments where bigquery library is not available.
@@ -100,13 +99,11 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
10099
ALL_TYPES_SCHEMA = (
101100
"int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING,"
102101
"bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP")
103-
_RUNNER = ""
104102

105103
def setUp(self):
106104
self.test_pipeline = TestPipeline(is_integration_test=True)
107105
self.args = self.test_pipeline.get_full_options_as_args()
108106
self.project = self.test_pipeline.get_option('project')
109-
_RUNNER = PipelineOptions(self.args).get_all_options()['runner']
110107

111108
self.bigquery_client = BigQueryWrapper()
112109
self.dataset_id = '%s_%s_%s' % (
@@ -247,7 +244,8 @@ def test_write_with_beam_rows(self):
247244
table=table_id, expansion_service=self.expansion_service))
248245
hamcrest_assert(p, bq_matcher)
249246

250-
def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
247+
def run_streaming(
248+
self, table_name, auto_sharding=False, use_at_least_once=False):
251249
elements = self.ELEMENTS.copy()
252250
schema = self.ALL_TYPES_SCHEMA
253251
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
@@ -262,43 +260,33 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
262260
streaming=True,
263261
allow_unsafe_triggers=True)
264262

265-
auto_sharding = (num_streams == 0)
266263
with beam.Pipeline(argv=args) as p:
267264
_ = (
268265
p
269-
| PeriodicImpulse(0, 4, 1)
270-
| beam.Map(lambda t: elements[t])
266+
| GenerateSequence(
267+
start=0, stop=4, expansion_service=self.expansion_service)
268+
| beam.Map(lambda x: elements[x])
271269
| beam.io.WriteToBigQuery(
272270
table=table_id,
273271
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
274272
schema=schema,
275-
triggering_frequency=1,
276273
with_auto_sharding=auto_sharding,
277-
num_storage_api_streams=num_streams,
278274
use_at_least_once=use_at_least_once,
279275
expansion_service=self.expansion_service))
280276
hamcrest_assert(p, bq_matcher)
281277

282-
@unittest.skipUnless(
283-
"dataflowrunner" in _RUNNER.lower(),
284-
"The exactly-once route has the requirement "
285-
"`beam:requirement:pardo:on_window_expiration:v1`, "
286-
"which is currently only supported by the Dataflow runner.")
287-
def test_streaming_with_fixed_num_streams(self):
288-
table = 'streaming_fixed_num_streams'
289-
self.run_streaming(table_name=table, num_streams=4)
290-
291-
@unittest.skip(
292-
"Streaming to the Storage Write API sink with autosharding is broken "
293-
"with Dataflow Runner V2.")
294-
def test_streaming_with_auto_sharding(self):
295-
table = 'streaming_with_auto_sharding'
278+
def test_streaming(self):
279+
table = 'streaming'
296280
self.run_streaming(table_name=table)
297281

298282
def test_streaming_with_at_least_once(self):
299-
table = 'streaming_with_at_least_once'
283+
table = 'streaming'
300284
self.run_streaming(table_name=table, use_at_least_once=True)
301285

286+
def test_streaming_with_auto_sharding(self):
287+
table = 'streaming_with_auto_sharding'
288+
self.run_streaming(table_name=table, auto_sharding=True)
289+
302290

303291
if __name__ == '__main__':
304292
logging.getLogger().setLevel(logging.INFO)

sdks/python/apache_beam/io/gcp/bigquery.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1869,7 +1869,6 @@ def __init__(
18691869
# TODO(https://github.com/apache/beam/issues/20712): Switch the default
18701870
# when the feature is mature.
18711871
with_auto_sharding=False,
1872-
num_storage_api_streams=0,
18731872
ignore_unknown_columns=False,
18741873
load_job_project_id=None,
18751874
max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
@@ -2019,9 +2018,6 @@ def __init__(
20192018
determined number of shards to write to BigQuery. This can be used for
20202019
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
20212020
applicable to unbounded input.
2022-
num_storage_api_streams: Specifies the number of write streams that the
2023-
Storage API sink will use. This parameter is only applicable when
2024-
writing unbounded data.
20252021
ignore_unknown_columns: Accept rows that contain values that do not match
20262022
the schema. The unknown values are ignored. Default is False,
20272023
which treats unknown values as errors. This option is only valid for
@@ -2064,7 +2060,6 @@ def __init__(
20642060
self.use_at_least_once = use_at_least_once
20652061
self.expansion_service = expansion_service
20662062
self.with_auto_sharding = with_auto_sharding
2067-
self._num_storage_api_streams = num_storage_api_streams
20682063
self.insert_retry_strategy = insert_retry_strategy
20692064
self._validate = validate
20702065
self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON
@@ -2264,7 +2259,6 @@ def find_in_nested_dict(schema):
22642259
triggering_frequency=triggering_frequency,
22652260
use_at_least_once=self.use_at_least_once,
22662261
with_auto_sharding=self.with_auto_sharding,
2267-
num_storage_api_streams=self._num_storage_api_streams,
22682262
expansion_service=self.expansion_service))
22692263

22702264
if is_rows:
@@ -2527,7 +2521,6 @@ def __init__(
25272521
triggering_frequency=0,
25282522
use_at_least_once=False,
25292523
with_auto_sharding=False,
2530-
num_storage_api_streams=0,
25312524
expansion_service=None):
25322525
"""Initialize a StorageWriteToBigQuery transform.
25332526
@@ -2565,7 +2558,6 @@ def __init__(
25652558
self._triggering_frequency = triggering_frequency
25662559
self._use_at_least_once = use_at_least_once
25672560
self._with_auto_sharding = with_auto_sharding
2568-
self._num_storage_api_streams = num_storage_api_streams
25692561
self._expansion_service = (
25702562
expansion_service or _default_io_expansion_service())
25712563
self.schematransform_config = SchemaAwareExternalTransform.discover_config(
@@ -2577,7 +2569,6 @@ def expand(self, input):
25772569
expansion_service=self._expansion_service,
25782570
rearrange_based_on_discovery=True,
25792571
autoSharding=self._with_auto_sharding,
2580-
numStreams=self._num_storage_api_streams,
25812572
createDisposition=self._create_disposition,
25822573
table=self._table,
25832574
triggeringFrequencySeconds=self._triggering_frequency,

website/www/site/content/en/documentation/io/built-in/google-bigquery.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ BigQuery Storage Write API for Python SDK currently has some limitations on supp
788788
{{< paragraph class="language-py" >}}
789789
**Note:** If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run `./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build` to build the expansion-service jar. If you are running from a released Beam SDK, the jar will already be included.
790790

791-
**Note:** Auto sharding is not currently supported for Python's Storage Write API exactly-once mode on DataflowRunner.
791+
**Note:** Auto sharding is not currently supported for Python's Storage Write API.
792792

793793
{{< /paragraph >}}
794794

0 commit comments

Comments
 (0)