Skip to content

Commit ff98a82

Browse files
committed
fix other incorrect usage of Clock
1 parent 34af7e9 commit ff98a82

4 files changed

Lines changed: 33 additions & 28 deletions

File tree

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.ingest;
2121

22-
import java.time.Clock;
2322
import java.util.ArrayList;
2423
import java.util.Arrays;
2524
import java.util.Collection;
@@ -29,6 +28,8 @@
2928
import java.util.ListIterator;
3029
import java.util.Map;
3130
import java.util.Set;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.function.LongSupplier;
3233
import java.util.stream.Collectors;
3334
import org.elasticsearch.script.IngestConditionalScript;
3435
import org.elasticsearch.script.Script;
@@ -44,19 +45,19 @@ public class ConditionalProcessor extends AbstractProcessor {
4445

4546
private final Processor processor;
4647
private final IngestMetric metric;
47-
private final Clock clock;
48+
private final LongSupplier relativeTimeProvider;
4849

4950
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
50-
this(tag, script, scriptService, processor, Clock.systemUTC());
51+
this(tag, script, scriptService, processor, System::nanoTime);
5152
}
5253

53-
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, Clock clock) {
54+
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
5455
super(tag);
5556
this.condition = script;
5657
this.scriptService = scriptService;
5758
this.processor = processor;
5859
this.metric = new IngestMetric();
59-
this.clock = clock;
60+
this.relativeTimeProvider = relativeTimeProvider;
6061
}
6162

6263
@Override
@@ -65,15 +66,15 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6566
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
6667
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
6768
// Only record metric if the script evaluates to true
68-
long startTimeInMillis = clock.millis();
69+
long startTimeInNanos = relativeTimeProvider.getAsLong();
6970
try {
7071
metric.preIngest();
7172
return processor.execute(ingestDocument);
7273
} catch (Exception e) {
7374
metric.ingestFailed();
7475
throw e;
7576
} finally {
76-
long ingestTimeInMillis = clock.millis() - startTimeInMillis;
77+
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
7778
metric.postIngest(ingestTimeInMillis);
7879
}
7980
}

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import org.elasticsearch.ElasticsearchParseException;
2323
import org.elasticsearch.common.Nullable;
2424

25-
import java.time.Clock;
2625
import java.util.Arrays;
2726
import java.util.Collections;
2827
import java.util.List;
2928
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.function.LongSupplier;
3031

3132
import org.elasticsearch.script.ScriptService;
3233

@@ -47,20 +48,21 @@ public final class Pipeline {
4748
private final Integer version;
4849
private final CompoundProcessor compoundProcessor;
4950
private final IngestMetric metrics;
50-
private final Clock clock;
51+
private final LongSupplier relativeTimeProvider;
5152

5253
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
53-
this(id, description, version, compoundProcessor, Clock.systemUTC());
54+
this(id, description, version, compoundProcessor, System::nanoTime);
5455
}
5556

5657
//package private for testing
57-
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) {
58+
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor,
59+
LongSupplier relativeTimeProvider) {
5860
this.id = id;
5961
this.description = description;
6062
this.compoundProcessor = compoundProcessor;
6163
this.version = version;
6264
this.metrics = new IngestMetric();
63-
this.clock = clock;
65+
this.relativeTimeProvider = relativeTimeProvider;
6466
}
6567

6668
public static Pipeline create(String id, Map<String, Object> config,
@@ -89,15 +91,15 @@ public static Pipeline create(String id, Map<String, Object> config,
8991
* Modifies the data of a document to be indexed based on the processor this pipeline holds
9092
*/
9193
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
92-
long startTimeInMillis = clock.millis();
94+
long startTimeInNanos = relativeTimeProvider.getAsLong();
9395
try {
9496
metrics.preIngest();
9597
return compoundProcessor.execute(ingestDocument);
9698
} catch (Exception e) {
9799
metrics.ingestFailed();
98100
throw e;
99101
} finally {
100-
long ingestTimeInMillis = clock.millis() - startTimeInMillis;
102+
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
101103
metrics.postIngest(ingestTimeInMillis);
102104
}
103105
}

server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@
2727
import org.elasticsearch.script.ScriptType;
2828
import org.elasticsearch.test.ESTestCase;
2929

30-
import java.time.Clock;
3130
import java.util.ArrayList;
3231
import java.util.Collections;
3332
import java.util.HashMap;
3433
import java.util.List;
3534
import java.util.Map;
3635
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.TimeUnit;
3737
import java.util.function.Consumer;
38+
import java.util.function.LongSupplier;
3839

3940
import static org.hamcrest.Matchers.equalTo;
4041
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -65,8 +66,8 @@ public void testChecksCondition() throws Exception {
6566
new HashMap<>(ScriptModule.CORE_CONTEXTS)
6667
);
6768
Map<String, Object> document = new HashMap<>();
68-
Clock clock = mock(Clock.class);
69-
when(clock.millis()).thenReturn(0L, 1L, 0L, 2L);
69+
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
70+
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2));
7071
ConditionalProcessor processor = new ConditionalProcessor(
7172
randomAlphaOfLength(10),
7273
new Script(
@@ -91,7 +92,7 @@ public String getType() {
9192
public String getTag() {
9293
return null;
9394
}
94-
}, clock);
95+
}, relativeTimeProvider);
9596

9697
//false, never call processor never increments metrics
9798
String falseValue = "falsy";

server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
import org.elasticsearch.ElasticsearchException;
2222
import org.elasticsearch.test.ESTestCase;
2323

24-
import java.time.Clock;
2524
import java.util.Arrays;
2625
import java.util.Collections;
2726
import java.util.HashMap;
2827
import java.util.Map;
2928
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.function.LongSupplier;
3031

3132
import static org.hamcrest.CoreMatchers.equalTo;
3233
import static org.mockito.Mockito.mock;
@@ -143,15 +144,15 @@ public void testPipelineProcessorWithPipelineChain() throws Exception {
143144
pipeline2ProcessorConfig.put("pipeline", pipeline3Id);
144145
PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig);
145146

146-
Clock clock = mock(Clock.class);
147-
when(clock.millis()).thenReturn(0L).thenReturn(0L);
147+
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
148+
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
148149
Pipeline pipeline1 = new Pipeline(
149-
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock
150+
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider
150151
);
151152

152153
String key1 = randomAlphaOfLength(10);
153-
clock = mock(Clock.class);
154-
when(clock.millis()).thenReturn(0L).thenReturn(3L);
154+
relativeTimeProvider = mock(LongSupplier.class);
155+
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3));
155156
Pipeline pipeline2 = new Pipeline(
156157
pipeline2Id, null, null, new CompoundProcessor(true,
157158
Arrays.asList(
@@ -160,15 +161,15 @@ pipeline2Id, null, null, new CompoundProcessor(true,
160161
}),
161162
pipeline2Processor),
162163
Collections.emptyList()),
163-
clock
164+
relativeTimeProvider
164165
);
165-
clock = mock(Clock.class);
166-
when(clock.millis()).thenReturn(0L).thenReturn(2L);
166+
relativeTimeProvider = mock(LongSupplier.class);
167+
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2));
167168
Pipeline pipeline3 = new Pipeline(
168169
pipeline3Id, null, null, new CompoundProcessor(
169170
new TestProcessor(ingestDocument -> {
170171
throw new RuntimeException("error");
171-
})), clock
172+
})), relativeTimeProvider
172173
);
173174
when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
174175
when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);

0 commit comments

Comments
 (0)