Skip to content

Commit cfacec5

Browse files
committed
LockNotBeforeTry
1 parent bb1104a commit cfacec5

6 files changed

Lines changed: 13 additions & 13 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1142,7 +1142,6 @@ class BeamModulePlugin implements Plugin<Project> {
11421142
options.errorprone.errorproneArgs.add("-Xep:ExtendsAutoValue:OFF")
11431143
options.errorprone.errorproneArgs.add("-Xep:FloatingPointAssertionWithinEpsilon:OFF")
11441144
options.errorprone.errorproneArgs.add("-Xep:JavaTimeDefaultTimeZone:OFF")
1145-
options.errorprone.errorproneArgs.add("-Xep:LockNotBeforeTry:OFF")
11461145
options.errorprone.errorproneArgs.add("-Xep:MixedMutabilityReturnType:OFF")
11471146
options.errorprone.errorproneArgs.add("-Xep:PreferJavaTimeOverload:OFF")
11481147
options.errorprone.errorproneArgs.add("-Xep:ModifiedButNotUsed:OFF")

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1133,8 +1133,8 @@ public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
11331133
}
11341134

11351135
private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
1136+
bufferLock.lock();
11361137
try {
1137-
bufferLock.lock();
11381138
pushedBackElementsHandler.pushBack(taggedValue);
11391139
} catch (Exception e) {
11401140
throw new RuntimeException("Couldn't pushback element.", e);

runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ public void testEnsureStateCleanupWithKeyedInput() throws Exception {
637637
assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class));
638638
}
639639

640+
@SuppressWarnings("LockNotBeforeTry")
640641
@Test
641642
public void testEnsureStateCleanupWithKeyedInputCleanupTimer() {
642643
InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ boolean shouldWait() throws Exception {
153153
public Consumer<Integer> processedElementsConsumer() {
154154
usingElementsProcessed = true;
155155
return elementsProcessed -> {
156+
lock.lock();
156157
try {
157-
lock.lock();
158158
this.elementsProcessed.set(elementsProcessed);
159159
condition.signal();
160160
} finally {
@@ -168,8 +168,8 @@ public Consumer<Integer> processedElementsConsumer() {
168168

169169
private void maybeWait() throws Exception {
170170
if (shouldWait()) {
171+
lock.lock();
171172
try {
172-
lock.lock();
173173
while (shouldWait()) {
174174
LOG.debug(
175175
"Throttling elements at {} until more than {} elements been processed.",
@@ -185,8 +185,8 @@ private void maybeWait() throws Exception {
185185

186186
public void abortWait() {
187187
usingElementsProcessed = false;
188+
lock.lock();
188189
try {
189-
lock.lock();
190190
condition.signal();
191191
} finally {
192192
lock.unlock();

runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,11 @@ private ImmutableList<EnvironmentCacheAndLock> createEnvironmentCaches(
204204
notification -> {
205205
WrappedSdkHarnessClient client = notification.getValue();
206206
final int refCount;
207+
// We need to use a lock here to ensure we are not causing the environment to
208+
// be removed if beforehand a StageBundleFactory has retrieved it but not yet
209+
// issued ref() on it.
210+
refLock.lock();
207211
try {
208-
// We need to use a lock here to ensure we are not causing the environment to
209-
// be removed if beforehand a StageBundleFactory has retrieved it but not yet
210-
// issued ref() on it.
211-
refLock.lock();
212212
refCount = client.unref();
213213
} finally {
214214
refLock.unlock();
@@ -474,8 +474,8 @@ public RemoteBundle getBundle(
474474
currentCache = availableCaches.take();
475475
// Lock because the environment expiration can remove the ref for the client
476476
// which would close the underlying environment before we can ref it.
477+
currentCache.lock.lock();
477478
try {
478-
currentCache.lock.lock();
479479
client = currentCache.cache.getUnchecked(executableStage.getEnvironment());
480480
client.ref();
481481
} finally {
@@ -494,8 +494,8 @@ public RemoteBundle getBundle(
494494
currentCache = environmentCaches.get(environmentIndex);
495495
// Lock because the environment expiration can remove the ref for the client which would
496496
// close the underlying environment before we can ref it.
497+
currentCache.lock.lock();
497498
try {
498-
currentCache.lock.lock();
499499
client = currentCache.cache.getUnchecked(executableStage.getEnvironment());
500500
client.ref();
501501
} finally {

sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ public T take() throws Exception, InterruptedException {
112112
* queue clears the exception.
113113
*/
114114
public void cancel(Exception exception) {
115+
lock.lock();
115116
try {
116-
lock.lock();
117117
cancellationException = exception;
118118
notEmpty.signalAll();
119119
notFull.signalAll();
@@ -124,8 +124,8 @@ public void cancel(Exception exception) {
124124

125125
/** Enables the queue to be re-used after it has been cancelled. */
126126
public void reset() {
127+
lock.lock();
127128
try {
128-
lock.lock();
129129
cancellationException = null;
130130
addIndex = 0;
131131
takeIndex = 0;

0 commit comments

Comments
 (0)