Skip to content

Commit cbccc59

Browse files
committed
Merge remote-tracking branch 'origin/master' into test_vendored_grpc
2 parents 6ed8c01 + 37fd13e commit cbccc59

31 files changed

Lines changed: 2141 additions & 172 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_36_0.groovy

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ class GrpcVendoring_1_36_0 {
6969
static List<String> runtimeDependencies() {
7070
return [
7171
'com.google.errorprone:error_prone_annotations:2.4.0',
72-
'commons-logging:commons-logging:1.2',
73-
'org.slf4j:slf4j-api:1.7.30',
7472
// TODO(BEAM-9288): Enable relocation for conscrypt
7573
"org.conscrypt:conscrypt-openjdk-uber:$conscrypt_version"
7674
]
@@ -158,8 +156,6 @@ class GrpcVendoring_1_36_0 {
158156
"javax/annotation/**",
159157
"junit/**",
160158
"module-info.class",
161-
"org/apache/commons/logging/**",
162-
"org/apache/log/**",
163159
"org/checkerframework/**",
164160
"org/codehaus/mojo/animal_sniffer/**",
165161
"org/conscrypt/**",
@@ -169,7 +165,6 @@ class GrpcVendoring_1_36_0 {
169165
"org/junit/**",
170166
"org/mockito/**",
171167
"org/objenesis/**",
172-
"org/slf4j/**",
173168
]
174169
}
175170

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ShortIdMap.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public class ShortIdMap {
3030

3131
public synchronized String getOrCreateShortId(MonitoringInfo info) {
3232
Preconditions.checkNotNull(info);
33+
Preconditions.checkArgument(info.getPayload().isEmpty());
34+
Preconditions.checkArgument(!info.hasStartTime());
35+
3336
String shortId = monitoringInfoMap.inverse().get(info);
3437
if (shortId == null) {
3538
shortId = "metric" + counter++;
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core.metrics;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Set;
27+
import org.apache.beam.model.pipeline.v1.MetricsApi;
28+
import org.apache.beam.sdk.values.KV;
29+
import org.junit.Test;
30+
31+
public class ShortIdMapTest {
32+
33+
@Test
34+
public void testShortIdAssignment() throws Exception {
35+
ShortIdMap shortIdMap = new ShortIdMap();
36+
List<KV<String, MetricsApi.MonitoringInfo>> testCases = new ArrayList<>();
37+
38+
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(false);
39+
builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64);
40+
testCases.add(KV.of("metric0", builder.build()));
41+
42+
builder = new SimpleMonitoringInfoBuilder(false);
43+
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
44+
testCases.add(KV.of("metric1", builder.build()));
45+
46+
builder = new SimpleMonitoringInfoBuilder(false);
47+
builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE);
48+
testCases.add(KV.of("metric2", builder.build()));
49+
50+
builder = new SimpleMonitoringInfoBuilder(false);
51+
builder.setUrn("TestingSentinelUrn");
52+
builder.setType("TestingSentinelType");
53+
testCases.add(KV.of("metric3", builder.build()));
54+
55+
builder = new SimpleMonitoringInfoBuilder(false);
56+
builder.setUrn(MonitoringInfoConstants.Urns.FINISH_BUNDLE_MSECS);
57+
testCases.add(KV.of("metric4", builder.build()));
58+
59+
builder = new SimpleMonitoringInfoBuilder(false);
60+
builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64);
61+
testCases.add(KV.of("metric5", builder.build()));
62+
63+
builder = new SimpleMonitoringInfoBuilder(false);
64+
builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64);
65+
builder.setLabel(MonitoringInfoConstants.Labels.NAME, "metricNumber7");
66+
builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace");
67+
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myPtransform");
68+
testCases.add(KV.of("metric6", builder.build()));
69+
70+
builder = new SimpleMonitoringInfoBuilder(false);
71+
builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64);
72+
builder.setLabel(MonitoringInfoConstants.Labels.NAME, "metricNumber8");
73+
builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace");
74+
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myPtransform");
75+
testCases.add(KV.of("metric7", builder.build()));
76+
77+
builder = new SimpleMonitoringInfoBuilder(false);
78+
builder.setUrn(MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
79+
builder.setLabel(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
80+
testCases.add(KV.of("metric8", builder.build()));
81+
82+
builder = new SimpleMonitoringInfoBuilder(false);
83+
builder.setUrn(MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
84+
builder.setLabel(MonitoringInfoConstants.Labels.SERVICE, "Storage");
85+
testCases.add(KV.of("metric9", builder.build()));
86+
87+
// Validate that modifying the payload, but using the same URN/labels
88+
// does not change the shortId assignment.
89+
builder = new SimpleMonitoringInfoBuilder(false);
90+
builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE);
91+
testCases.add(KV.of("metric2", builder.build()));
92+
93+
builder = new SimpleMonitoringInfoBuilder(false);
94+
builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64);
95+
builder.setLabel(MonitoringInfoConstants.Labels.NAME, "metricNumber7");
96+
builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace");
97+
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myPtransform");
98+
testCases.add(KV.of("metric6", builder.build()));
99+
100+
// Verify each short ID is assigned properly.
101+
Set<String> expectedShortIds = new HashSet<String>();
102+
for (KV<String, MetricsApi.MonitoringInfo> entry : testCases) {
103+
assertEquals(entry.getKey(), shortIdMap.getOrCreateShortId(entry.getValue()));
104+
expectedShortIds.add(entry.getKey());
105+
}
106+
107+
HashMap<String, MetricsApi.MonitoringInfo> actualRecoveredInfos = new HashMap<>();
108+
for (String expectedShortId : expectedShortIds) {
109+
actualRecoveredInfos.put(expectedShortId, shortIdMap.get(expectedShortId));
110+
}
111+
// Retrieve all of the MonitoringInfos by short id, and verify that the
112+
// metadata (everything but the payload) matches the originals
113+
assertEquals(expectedShortIds, actualRecoveredInfos.keySet());
114+
for (KV<String, MetricsApi.MonitoringInfo> entry : testCases) {
115+
// Clear payloads of both expected and actual before comparing
116+
MetricsApi.MonitoringInfo expectedMonitoringInfo = entry.getValue();
117+
MetricsApi.MonitoringInfo.Builder expected =
118+
MetricsApi.MonitoringInfo.newBuilder(expectedMonitoringInfo);
119+
expected.clearPayload();
120+
121+
MetricsApi.MonitoringInfo.Builder actual =
122+
MetricsApi.MonitoringInfo.newBuilder(actualRecoveredInfos.get(entry.getKey()));
123+
actual.clearPayload();
124+
assertEquals(expected.build(), actual.build());
125+
}
126+
127+
// Verify each short ID is assigned properly, in reverse.
128+
for (int i = testCases.size() - 1; i > 0; i--) {
129+
assertEquals(
130+
testCases.get(i).getKey(), shortIdMap.getOrCreateShortId(testCases.get(i).getValue()));
131+
}
132+
}
133+
}

runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import org.junit.AfterClass;
6969
import org.junit.BeforeClass;
7070
import org.junit.ClassRule;
71-
import org.junit.Ignore;
7271
import org.junit.Rule;
7372
import org.junit.Test;
7473
import org.junit.rules.TemporaryFolder;
@@ -142,7 +141,6 @@ public void afterTest() throws Exception {
142141
ensureNoJobRunning();
143142
}
144143

145-
@Ignore("https://issues.apache.org/jira/projects/BEAM/issues/BEAM-10955")
146144
@Test
147145
public void testSavepointRestoreLegacy() throws Exception {
148146
runSavepointAndRestore(false);
@@ -193,7 +191,7 @@ private void runSavepointAndRestore(boolean isPortablePipeline) throws Exception
193191
private JobID executeLegacy(Pipeline pipeline) throws Exception {
194192
JobGraph jobGraph = getJobGraph(pipeline);
195193
flinkCluster.submitJob(jobGraph).get();
196-
return jobGraph.getJobID();
194+
return waitForJobToBeReady();
197195
}
198196

199197
private JobID executePortable(Pipeline pipeline) throws Exception {

sdks/go/README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,21 +132,28 @@ in a subdirectory of your GOPATH. This permits existing gradle tools to use your
132132
$ mkdir -p $GOPATH/src/github.com/apache/
133133
$ cd $GOPATH/src/github.com/apache/
134134
135-
136135
# Clone the repo, and update your branch as normal
137136
$ git clone https://github.com/apache/beam.git
138137
$ cd beam
139138
$ git remote add <GitHub_user> git@github.com:<GitHub_user>/beam.git
140139
$ git fetch --all
141140
141+
# Navigate to the Go SDK root
142+
$ cd sdks/go
143+
# Set GO111MODULE to auto to allow building within a $GOPATH
144+
$ go env -w GO111MODULE=auto
142145
# Get or Update all the Go SDK dependencies
143146
$ go get -u ./...
144-
# Test that the system compiles and runs.
147+
# Test that the system compiles and runs
145148
$ go test ./...
146149
```
147150

148151
If you don’t have a GOPATH set, follow [these instructions](https://github.com/golang/go/wiki/SettingGOPATH) to create a new directory in your home directory, and use that.
149152

153+
Developing the Go SDK doesn't support Go versions 1.17 or greater at this time.
154+
Once [BEAM-5379](https://issues.apache.org/jira/browse/BEAM-5379) is resolved,
155+
and the SDK properly supports Go Modules this should be simpler.
156+
150157
Follow the [contribution guide](https://beam.apache.org/contribute/contribution-guide/#code) to create branches, and submit pull requests as normal.
151158

152159
### Dependency management

sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ const (
8181
// be found in Java's KafkaIO documentation.
8282
LogAppendTime policy = "LogAppendTime"
8383

84-
readURN = "beam:external:java:kafka:read:v1"
84+
readURN = "beam:external:java:kafkaio:typedwithoutmetadata:v1"
8585
writeURN = "beam:external:java:kafka:write:v1"
8686
)
8787

sdks/go/pkg/beam/testing/passert/equals.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ func Equals(s beam.Scope, col beam.PCollection, values ...interface{}) beam.PCol
4040
return equals(subScope, col, other)
4141
}
4242

43+
// EqualsList verifies that the given collection has the same values as a
44+
// given list, under coder equality. The values must be provided as an
45+
// array or slice. This is equivalent to passing a beam.CreateList PCollection
46+
// to Equals.
47+
func EqualsList(s beam.Scope, col beam.PCollection, list interface{}) beam.PCollection {
48+
subScope := s.Scope("passert.EqualsList")
49+
if list == nil {
50+
return Empty(subScope, col)
51+
}
52+
listCollection := beam.CreateList(subScope, list)
53+
return equals(subScope, col, listCollection)
54+
}
55+
4356
// equals verifies that the actual values match the expected ones.
4457
func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
4558
unexpected, correct, missing := Diff(s, actual, expected)
@@ -100,3 +113,4 @@ func readToStrings(iter func(*beam.T) bool) []string {
100113
sort.Strings(out)
101114
return out
102115
}
116+

0 commit comments

Comments
 (0)