Skip to content

Commit 57e7a8a

Browse files
CLEANUP+JAVAFICATION+PERFORMANCE: Keep hard reference to Ruby Thread Context, simplify code accordingly
Fixes #9401
1 parent c6d3f30 commit 57e7a8a

13 files changed

Lines changed: 49 additions & 330 deletions

File tree

logstash-core/spec/logstash/java_filter_delegator_spec.rb

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,19 @@ def filter(event)
8383

8484
context "when the flush return events" do
8585
it "increments the out" do
86-
ruby_context = RubyUtil::RUBY.getCurrentContext
87-
subject.to_java.multiFilter(ruby_context, [LogStash::Event.new])
86+
subject.to_java.multiFilter([LogStash::Event.new])
8887
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
8988
"filter/my_filter"
9089
)[:filter][:my_filter][:events]
9190
expect(event_metrics[:out].value).to eq(0)
92-
subject.to_java.flush(ruby_context, {})
91+
subject.to_java.flush({})
9392
expect(event_metrics[:out].value).to eq(1)
9493
end
9594
end
9695

9796
context "when the flush doesn't return anything" do
9897
it "doesnt increment the out" do
99-
subject.to_java.flush(RubyUtil::RUBY.getCurrentContext, {})
98+
subject.to_java.flush({})
10099
expect(
101100
metric.collector.snapshot_metric.metric_store.
102101
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:duration_in_millis].value
@@ -107,15 +106,15 @@ def filter(event)
107106
context "when the filter buffer events" do
108107

109108
it "has incremented :in" do
110-
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
109+
subject.to_java.multiFilter(events)
111110
expect(
112111
metric.collector.snapshot_metric.metric_store.
113112
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:in].value
114113
).to eq(events.size)
115114
end
116115

117116
it "has not incremented :out" do
118-
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
117+
subject.to_java.multiFilter(events)
119118
expect(
120119
metric.collector.snapshot_metric.metric_store.
121120
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:out].value
@@ -140,7 +139,7 @@ def filter(event)
140139
end
141140

142141
it "increments the in/out of the metric" do
143-
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
142+
subject.to_java.multiFilter(events)
144143
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
145144
"filter/my_filter"
146145
)[:filter][:my_filter][:events]
@@ -171,7 +170,7 @@ def filter(event)
171170
end
172171

173172
it "increments the in/out of the metric" do
174-
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
173+
subject.to_java.multiFilter(events)
175174
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
176175
"filter/my_filter"
177176
)[:filter][:my_filter][:events]

logstash-core/src/main/java/org/logstash/ConvertedMap.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.jruby.RubyString;
99
import org.jruby.runtime.ThreadContext;
1010
import org.jruby.runtime.builtin.IRubyObject;
11+
import org.logstash.execution.WorkerLoop;
1112

1213
/**
1314
* <p>This class is an internal API and behaves very different from a standard {@link Map}.</p>
@@ -60,7 +61,7 @@ public static ConvertedMap newFromMap(Map<? extends Serializable, Object> o) {
6061
}
6162

6263
public static ConvertedMap newFromRubyHash(final RubyHash o) {
63-
return newFromRubyHash(o.getRuntime().getCurrentContext(), o);
64+
return newFromRubyHash(WorkerLoop.THREAD_CONTEXT.get(), o);
6465
}
6566

6667
public static ConvertedMap newFromRubyHash(final ThreadContext context, final RubyHash o) {
Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package org.logstash.config.ir.compiler;
22

33
import java.util.ArrayList;
4-
import java.util.Collection;
54
import java.util.Collections;
65
import java.util.List;
76
import java.util.stream.Collectors;
8-
import org.jruby.Ruby;
9-
import org.jruby.runtime.ThreadContext;
107

118
/**
129
* A syntactic closure.
@@ -18,28 +15,6 @@ final class Closure implements MethodLevelSyntaxElement {
1815
*/
1916
public static final Closure EMPTY = new Closure(Collections.emptyList());
2017

21-
/**
22-
* Variable declaration for the Ruby thread-context,
23-
* renders as {@code final ThreadContext context}.
24-
*/
25-
private static final VariableDefinition RUBY_THREAD_CONTEXT =
26-
new VariableDefinition(ThreadContext.class, "context");
27-
28-
/**
29-
* Variable declaration for the Ruby thread-context,
30-
* renders as {@code final ThreadContext context = RubyUtil.RUBY.getCurrentContext()}.
31-
*/
32-
private static final MethodLevelSyntaxElement CACHE_RUBY_THREADCONTEXT =
33-
SyntaxFactory.definition(
34-
RUBY_THREAD_CONTEXT, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT
35-
);
36-
37-
/**
38-
* Variable referencing the current Ruby thread context.
39-
*/
40-
private static final ValueSyntaxElement CACHED_RUBY_THREADCONTEXT =
41-
RUBY_THREAD_CONTEXT.access();
42-
4318
private final List<MethodLevelSyntaxElement> statements;
4419

4520
public static Closure wrap(final MethodLevelSyntaxElement... statements) {
@@ -78,50 +53,10 @@ public boolean empty() {
7853

7954
@Override
8055
public String generateCode() {
81-
final Collection<MethodLevelSyntaxElement> optimized =
82-
this.optimizeRubyThreadContexts().statements;
83-
return optimized.isEmpty() ? "" : SyntaxFactory.join(
84-
optimized.stream().map(MethodLevelSyntaxElement::generateCode).collect(
56+
return statements.isEmpty() ? "" : SyntaxFactory.join(
57+
statements.stream().map(MethodLevelSyntaxElement::generateCode).collect(
8558
Collectors.joining(";\n")
8659
), ";"
8760
);
8861
}
89-
90-
/**
91-
* Removes duplicate calls to {@link Ruby#getCurrentContext()} by caching them to a variable.
92-
* @return Copy of this Closure without redundant calls to {@link Ruby#getCurrentContext()}
93-
*/
94-
private Closure optimizeRubyThreadContexts() {
95-
final ArrayList<Integer> rubyCalls = new ArrayList<>();
96-
for (int i = 0; i < statements.size(); ++i) {
97-
if (statements.get(i).count(ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT) > 0) {
98-
rubyCalls.add(i);
99-
}
100-
}
101-
final Closure optimized;
102-
if (rubyCalls.size() > 1) {
103-
optimized = (Closure) new Closure().add(this).replace(
104-
ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, CACHED_RUBY_THREADCONTEXT
105-
);
106-
optimized.statements.add(rubyCalls.get(0), CACHE_RUBY_THREADCONTEXT);
107-
} else {
108-
optimized = this;
109-
}
110-
return optimized;
111-
}
112-
113-
@Override
114-
public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
115-
final MethodLevelSyntaxElement replacement) {
116-
final Closure result = new Closure();
117-
for (final MethodLevelSyntaxElement element : this.statements) {
118-
result.add(element.replace(search, replacement));
119-
}
120-
return result;
121-
}
122-
123-
@Override
124-
public int count(final MethodLevelSyntaxElement search) {
125-
return statements.stream().mapToInt(child -> child.count(search)).sum();
126-
}
12762
}

logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,20 +187,15 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D
187187

188188
private static ValueSyntaxElement invokeOutput(final ValueSyntaxElement output,
189189
final MethodLevelSyntaxElement events) {
190-
return output.call("multiReceive", ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, events);
190+
return output.call("multiReceive", events);
191191
}
192192

193193
private static Closure filterBody(final ValueSyntaxElement outputBuffer,
194194
final ValueSyntaxElement inputBuffer, final ClassFields fields,
195195
final FilterDelegatorExt plugin) {
196196
final ValueSyntaxElement filterField = fields.add(plugin);
197197
final Closure body = Closure.wrap(
198-
buffer(
199-
outputBuffer,
200-
filterField.call(
201-
"multiFilter", ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, inputBuffer
202-
)
203-
)
198+
buffer(outputBuffer, filterField.call("multiFilter", inputBuffer))
204199
);
205200
if (plugin.hasFlush()) {
206201
body.add(callFilterFlush(fields, outputBuffer, filterField, !plugin.periodicFlush()));
@@ -317,13 +312,7 @@ private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields
317312
);
318313
}
319314
return SyntaxFactory.ifCondition(
320-
condition,
321-
Closure.wrap(
322-
buffer(
323-
resultBuffer,
324-
filterPlugin.call(FLUSH, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, flushArgs)
325-
)
326-
)
315+
condition, Closure.wrap(buffer(resultBuffer, filterPlugin.call(FLUSH, flushArgs)))
327316
);
328317
}
329318

logstash-core/src/main/java/org/logstash/config/ir/compiler/EventCondition.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.logstash.config.ir.expression.binary.RegexEq;
3232
import org.logstash.config.ir.expression.unary.Not;
3333
import org.logstash.config.ir.expression.unary.Truthy;
34+
import org.logstash.execution.WorkerLoop;
3435
import org.logstash.ext.JrubyEventExtLibrary;
3536

3637
/**
@@ -598,7 +599,7 @@ private FieldMatches(final String field, final String regex) {
598599
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
599600
final Object tomatch = event.getEvent().getUnconvertedField(field);
600601
return tomatch instanceof RubyString &&
601-
!((RubyString) tomatch).match(RubyUtil.RUBY.getCurrentContext(), regex).isNil();
602+
!((RubyString) tomatch).match(WorkerLoop.THREAD_CONTEXT.get(), regex).isNil();
602603
}
603604
}
604605

logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.jruby.runtime.ThreadContext;
1616
import org.jruby.runtime.builtin.IRubyObject;
1717
import org.logstash.RubyUtil;
18+
import org.logstash.execution.WorkerLoop;
1819
import org.logstash.ext.JrubyEventExtLibrary;
1920
import org.logstash.instrument.metrics.MetricKeys;
2021
import org.logstash.instrument.metrics.counter.LongCounter;
@@ -127,7 +128,8 @@ public IRubyObject strategy(final ThreadContext context) {
127128
}
128129

129130
@SuppressWarnings("unchecked")
130-
public RubyArray multiFilter(final ThreadContext context, final RubyArray batch) {
131+
public RubyArray multiFilter(final RubyArray batch) {
132+
final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
131133
eventMetricIn.increment((long) batch.size());
132134
final long start = System.nanoTime();
133135
final RubyArray result = (RubyArray) filter.callMethod(context, "multi_filter", batch);
@@ -144,7 +146,8 @@ public RubyArray multiFilter(final ThreadContext context, final RubyArray batch)
144146
return result;
145147
}
146148

147-
public RubyArray flush(final ThreadContext context, final RubyHash options) {
149+
public RubyArray flush(final RubyHash options) {
150+
final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
148151
final IRubyObject newEvents = filter.callMethod(context, "flush", options);
149152
final RubyArray result;
150153
if (newEvents.isNil()) {

logstash-core/src/main/java/org/logstash/config/ir/compiler/MethodLevelSyntaxElement.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,4 @@ interface MethodLevelSyntaxElement extends SyntaxElement {
99
* Syntax element that generates {@code return null}.
1010
*/
1111
MethodLevelSyntaxElement RETURN_NULL = SyntaxFactory.ret(SyntaxFactory.value("null"));
12-
13-
/**
14-
* Replace any occurrences of {@code search} by {@code replacement} in this element.
15-
* @param search Syntax element to replace
16-
* @param replacement Replacement
17-
* @return A copy of this element with the replacement applied
18-
*/
19-
MethodLevelSyntaxElement replace(MethodLevelSyntaxElement search,
20-
MethodLevelSyntaxElement replacement);
21-
22-
/**
23-
* Count the number of occurrences of {@code search} in this element.
24-
* @param search Element to count
25-
* @return Number of occurrences
26-
*/
27-
int count(MethodLevelSyntaxElement search);
2812
}

logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.jruby.runtime.Block;
1515
import org.jruby.runtime.ThreadContext;
1616
import org.jruby.runtime.builtin.IRubyObject;
17+
import org.logstash.execution.WorkerLoop;
1718
import org.logstash.instrument.metrics.MetricKeys;
1819
import org.logstash.instrument.metrics.counter.LongCounter;
1920

@@ -134,6 +135,10 @@ public IRubyObject strategy(final ThreadContext context) {
134135
return strategy;
135136
}
136137

138+
public IRubyObject multiReceive(final RubyArray events) {
139+
return multiReceive(WorkerLoop.THREAD_CONTEXT.get(), events);
140+
}
141+
137142
@JRubyMethod(name = "multi_receive")
138143
public IRubyObject multiReceive(final ThreadContext context, final IRubyObject events) {
139144
final RubyArray batch = (RubyArray) events;

0 commit comments

Comments
 (0)