Skip to content

Commit 42b77ea

Browse files
authored
Merge branch 'master' into BEAM-3713-remove-nose
2 parents dffa63d + 6fb4afe commit 42b77ea

251 files changed

Lines changed: 6650 additions & 1413 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_SparkSt
3737
gradle {
3838
rootBuildScriptDir(commonJobProperties.checkoutDir)
3939
tasks(':runners:spark:2:validatesStructuredStreamingRunnerBatch')
40+
tasks(':runners:spark:3:validatesStructuredStreamingRunnerBatch')
4041
commonJobProperties.setGradleSwitches(delegate)
4142
}
4243
}

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
7777
* Added ability to use JdbcIO.Write.withResults without statement and preparedStatementSetter. ([BEAM-12511](https://issues.apache.org/jira/browse/BEAM-12511))
7878
- Added ability to register URI schemes to use the S3 protocol via FileIO. ([BEAM-12435](https://issues.apache.org/jira/browse/BEAM-12435)).
79+
* Respect number of shards set in SnowflakeWrite batch mode. ([BEAM-12715](https://issues.apache.org/jira/browse/BEAM-12715))
80+
* Java SDK: Update Google Cloud Healthcare IO connectors from using v1beta1 to using the GA version.
7981

8082
## New Features / Improvements
8183

@@ -85,6 +87,7 @@
8587
(Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).
8688
* Reading from JDBC source by partitions (Java) ([BEAM-12456](https://issues.apache.org/jira/browse/BEAM-12456)).
8789
* PubsubIO can now write to a dead-letter topic after a parsing error (Java)([BEAM-12474](https://issues.apache.org/jira/browse/BEAM-12474)).
90+
* New append-only option for Elasticsearch sink (Java) [BEAM-12601](https://issues.apache.org/jira/browse/BEAM-12601)
8891

8992
## Breaking Changes
9093

@@ -98,6 +101,7 @@
98101
## Known Issues
99102

100103
* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
104+
* Fixed race condition in RabbitMqIO causing duplicate acks (Java) ([BEAM-6516](https://issues.apache.org/jira/browse/BEAM-6516)))
101105

102106
# [2.31.0] - 2021-07-08
103107

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ class BeamModulePlugin implements Plugin<Project> {
538538
google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20210326-$google_clients_version",
539539
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20210331-$google_clients_version",
540540
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20210408-$google_clients_version",
541-
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1beta1-rev20210407-$google_clients_version",
541+
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20210603-$google_clients_version",
542542
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20210322-$google_clients_version",
543543
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20210127-$google_clients_version",
544544
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import apache_beam as beam
18+
from apache_beam.testing.test_stream import TestStream
19+
from datetime import datetime
20+
import pytz
21+
22+
23+
class GenerateEvent(beam.PTransform):
24+
25+
@staticmethod
26+
def sample_data():
27+
return GenerateEvent()
28+
29+
def expand(self, input):
30+
31+
return (input
32+
| TestStream()
33+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
34+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
35+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
36+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
37+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
38+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
39+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
40+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
41+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
42+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
43+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
44+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
45+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
46+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
47+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
48+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
49+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
50+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
51+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
52+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
53+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
54+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
55+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
56+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
57+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).timestamp())
58+
.advance_watermark_to_infinity())
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
type: edu
21+
files:
22+
- name: task.py
23+
visible: true
24+
placeholders:
25+
- offset: 1342
26+
length: 387
27+
placeholder_text: TODO()
28+
- name: tests.py
29+
visible: false
30+
- name: generate_event.py
31+
visible: true
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
Early Triggers
20+
--------------
21+
22+
Triggers allow Beam to emit early results, before all the data in a given window has arrived. For
23+
example, emitting after a certain amount of time elapses, or after a certain number of elements
24+
arrives.
25+
26+
**Kata:** Given that sample events generated with one second granularity and a fixed window of 1-day duration,
27+
please implement an early trigger that emits the number of events count immediately after new
28+
element is processed.
29+
30+
<div class="hint">
31+
Use <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.trigger.html#apache_beam.transforms.trigger.AfterWatermark">
32+
withEarlyFirings</a> to set early firing triggers.
33+
</div>
34+
35+
<div class="hint">
36+
Use <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.FixedWindows">
37+
FixedWindows</a> with 1-day duration using
38+
<a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.trigger.html#apache_beam.transforms.trigger.AfterWatermark">
39+
AfterWatermark</a> trigger.
40+
</div>
41+
42+
<div class="hint">
43+
Set the <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html?highlight=allowed_lateness#apache_beam.transforms.core.Windowing">
44+
allowed lateness</a> to 0 with
45+
<a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.trigger.html#apache_beam.transforms.trigger.AccumulationMode">
46+
discarding accumulation mode</a>.
47+
</div>
48+
49+
<div class="hint">
50+
Use <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.CombineGlobally">
51+
CombineGlobally</a> and
52+
<code>CountCombineFn</code> to calculate the count of events.
53+
</div>
54+
55+
<div class="hint">
56+
Refer to the Beam Programming Guide
57+
<a href="https://beam.apache.org/documentation/programming-guide/#event-time-triggers">
58+
"Event time triggers"</a> section for more information.
59+
</div>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
import apache_beam as beam
21+
from generate_event import GenerateEvent
22+
from apache_beam.transforms.window import FixedWindows
23+
from apache_beam.transforms.trigger import AfterWatermark
24+
from apache_beam.transforms.trigger import AfterCount
25+
from apache_beam.transforms.trigger import AccumulationMode
26+
from apache_beam.utils.timestamp import Duration
27+
from apache_beam.options.pipeline_options import PipelineOptions
28+
from apache_beam.options.pipeline_options import StandardOptions
29+
from log_elements import LogElements
30+
31+
32+
def apply_transform(events):
33+
return (events
34+
| beam.WindowInto(FixedWindows(1*24*60*60), # 1 Day Window
35+
trigger=AfterWatermark(early=AfterCount(1)),
36+
accumulation_mode=AccumulationMode.DISCARDING,
37+
allowed_lateness=Duration(seconds=0))
38+
| beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
39+
40+
41+
def main():
42+
options = PipelineOptions()
43+
options.view_as(StandardOptions).streaming = True
44+
with beam.Pipeline(options=options) as p:
45+
events = p | GenerateEvent.sample_data()
46+
output = apply_transform(events)
47+
output | LogElements(with_window=True)
48+
49+
50+
if __name__ == "__main__":
51+
main()
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import pytz
18+
from datetime import datetime
19+
from task import apply_transform
20+
from apache_beam.testing.test_pipeline import TestPipeline
21+
from apache_beam.testing.test_stream import TestStream
22+
from apache_beam.transforms import window
23+
from apache_beam.options.pipeline_options import PipelineOptions
24+
from apache_beam.options.pipeline_options import StandardOptions
25+
from apache_beam.testing.util import assert_that, equal_to, equal_to_per_window
26+
from test_helper import failed, passed, get_file_output, test_is_not_empty
27+
28+
29+
def test_output():
30+
options = PipelineOptions()
31+
options.view_as(StandardOptions).streaming = True
32+
test_pipeline = TestPipeline(options=options)
33+
34+
events = ( test_pipeline
35+
| TestStream()
36+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
37+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
38+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
39+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
40+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
41+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
42+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
43+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
44+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
45+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
46+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
47+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
48+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
49+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
50+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
51+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
52+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
53+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
54+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
55+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
56+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
57+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
58+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
59+
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
60+
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).timestamp())
61+
.advance_watermark_to_infinity())
62+
63+
results = apply_transform(events)
64+
65+
answers = {
66+
window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 0, 0, tzinfo=pytz.UTC).timestamp(),
67+
datetime(2021, 3, 2, 0, 0, 0, 0, tzinfo=pytz.UTC).timestamp()): [1, 1, 1, 1, 1,
68+
1, 1, 1, 1, 1,
69+
1, 1, 1, 1, 1,
70+
1, 1, 1, 1, 1, 0],
71+
}
72+
73+
assert_that(results,
74+
equal_to_per_window(answers),
75+
label='count assert per window')
76+
77+
test_pipeline.run()
78+
79+
80+
if __name__ == '__main__':
81+
test_is_not_empty()
82+
test_output()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
content:
21+
- Early Triggers

0 commit comments

Comments
 (0)