Skip to content

Commit 47709a0

Browse files
authored
Support profile options for PPL - Part I Implement phases level metrics. (opensearch-project#4983)
* Init Signed-off-by: Peng Huo <penghuo@gmail.com> * Cleanup ThreadLocal Signed-off-by: Peng Huo <penghuo@gmail.com> * Update doc Signed-off-by: Peng Huo <penghuo@gmail.com> * Update Doc Signed-off-by: Peng Huo <penghuo@gmail.com> * Refactor Code Signed-off-by: Peng Huo <penghuo@gmail.com> * Remove unused code Signed-off-by: Peng Huo <penghuo@gmail.com> * Address comments Signed-off-by: Peng Huo <penghuo@gmail.com> * Add Task 1 - Add Phases level metrics Signed-off-by: Peng Huo <penghuo@gmail.com> * Reformat doc Signed-off-by: Peng Huo <penghuo@gmail.com> --------- Signed-off-by: Peng Huo <penghuo@gmail.com>
1 parent 5bf322f commit 47709a0

23 files changed

Lines changed: 655 additions & 11 deletions

File tree

common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class QueryContext {
2020
/** The key of the request id in the context map. */
2121
private static final String REQUEST_ID_KEY = "request_id";
2222

23+
private static final String PROFILE_KEY = "profile";
24+
2325
/**
2426
* Generates a random UUID and adds to the {@link ThreadContext} as the request id.
2527
*
@@ -66,4 +68,20 @@ private QueryContext() {
6668
throw new AssertionError(
6769
getClass().getCanonicalName() + " is a utility class and must not be initialized");
6870
}
71+
72+
/**
73+
* Store the profile flag in thread context.
74+
*
75+
* @param profileEnabled whether profiling is enabled
76+
*/
77+
public static void setProfile(boolean profileEnabled) {
78+
ThreadContext.put(PROFILE_KEY, Boolean.toString(profileEnabled));
79+
}
80+
81+
/**
82+
* @return true if profiling flag is set in the thread context.
83+
*/
84+
public static boolean isProfileEnabled() {
85+
return Boolean.parseBoolean(ThreadContext.get(PROFILE_KEY));
86+
}
6987
}

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
package org.opensearch.sql.calcite.utils;
2929

3030
import static java.util.Objects.requireNonNull;
31+
import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE;
3132

3233
import com.google.common.collect.ImmutableList;
3334
import java.lang.reflect.Type;
@@ -90,6 +91,8 @@
9091
import org.opensearch.sql.calcite.plan.OpenSearchRules;
9192
import org.opensearch.sql.calcite.plan.Scannable;
9293
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
94+
import org.opensearch.sql.monitor.profile.ProfileMetric;
95+
import org.opensearch.sql.monitor.profile.QueryProfiling;
9396

9497
/**
9598
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
@@ -342,6 +345,8 @@ public static class OpenSearchRelRunners {
342345
* org.apache.calcite.tools.RelRunners#run(RelNode)}
343346
*/
344347
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
348+
ProfileMetric optimizeTime = QueryProfiling.current().getOrCreateMetric(OPTIMIZE);
349+
long startTime = System.nanoTime();
345350
final RelShuttle shuttle =
346351
new RelHomogeneousShuttle() {
347352
@Override
@@ -360,7 +365,9 @@ public RelNode visit(TableScan scan) {
360365
// the line we changed here
361366
try (Connection connection = context.connection) {
362367
final RelRunner runner = connection.unwrap(RelRunner.class);
363-
return runner.prepareStatement(rel);
368+
PreparedStatement preparedStatement = runner.prepareStatement(rel);
369+
optimizeTime.set(System.nanoTime() - startTime);
370+
return preparedStatement;
364371
} catch (SQLException e) {
365372
// Detect if error is due to window functions in unsupported context (bins on time fields)
366373
String errorMsg = e.getMessage();

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@
4040
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
4141
import org.opensearch.sql.common.response.ResponseListener;
4242
import org.opensearch.sql.common.setting.Settings;
43+
import org.opensearch.sql.common.utils.QueryContext;
4344
import org.opensearch.sql.datasource.DataSourceService;
4445
import org.opensearch.sql.exception.CalciteUnsupportedException;
4546
import org.opensearch.sql.exception.NonFallbackCalciteException;
47+
import org.opensearch.sql.monitor.profile.MetricName;
48+
import org.opensearch.sql.monitor.profile.ProfileContext;
49+
import org.opensearch.sql.monitor.profile.ProfileMetric;
50+
import org.opensearch.sql.monitor.profile.QueryProfiling;
4651
import org.opensearch.sql.planner.PlanContext;
4752
import org.opensearch.sql.planner.Planner;
4853
import org.opensearch.sql.planner.logical.LogicalPaginate;
@@ -98,13 +103,18 @@ public void executeWithCalcite(
98103
CalcitePlanContext.run(
99104
() -> {
100105
try {
106+
ProfileContext profileContext =
107+
QueryProfiling.activate(QueryContext.isProfileEnabled());
108+
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
109+
long analyzeStart = System.nanoTime();
101110
CalcitePlanContext context =
102111
CalcitePlanContext.create(
103112
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
104113
RelNode relNode = analyze(plan, context);
105114
relNode = mergeAdjacentFilters(relNode);
106115
RelNode optimized = optimize(relNode, context);
107116
RelNode calcitePlan = convertToCalcitePlan(optimized);
117+
analyzeMetric.set(System.nanoTime() - analyzeStart);
108118
executionEngine.execute(calcitePlan, context, listener);
109119
} catch (Throwable t) {
110120
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
@@ -137,6 +147,7 @@ public void explainWithCalcite(
137147
CalcitePlanContext.run(
138148
() -> {
139149
try {
150+
QueryProfiling.noop();
140151
CalcitePlanContext context =
141152
CalcitePlanContext.create(
142153
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
import java.util.concurrent.atomic.LongAdder;
9+
10+
/** Concrete metric backed by {@link LongAdder}. */
11+
final class DefaultMetricImpl implements ProfileMetric {
12+
13+
private final String name;
14+
private final LongAdder value = new LongAdder();
15+
16+
/**
17+
* Construct a metric with the provided name.
18+
*
19+
* @param name metric name
20+
*/
21+
DefaultMetricImpl(String name) {
22+
this.name = name;
23+
}
24+
25+
@Override
26+
public String name() {
27+
return name;
28+
}
29+
30+
@Override
31+
public long value() {
32+
return value.sum();
33+
}
34+
35+
@Override
36+
public void add(long delta) {
37+
value.add(delta);
38+
}
39+
40+
@Override
41+
public void set(long value) {
42+
this.value.reset();
43+
this.value.add(value);
44+
}
45+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
import java.util.LinkedHashMap;
9+
import java.util.Map;
10+
import java.util.Objects;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
13+
/** Default implementation that records profiling metrics. */
14+
public class DefaultProfileContext implements ProfileContext {
15+
16+
private final long startNanos = System.nanoTime();
17+
private boolean finished;
18+
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
19+
private QueryProfile profile;
20+
21+
public DefaultProfileContext() {}
22+
23+
/** {@inheritDoc} */
24+
@Override
25+
public ProfileMetric getOrCreateMetric(MetricName name) {
26+
Objects.requireNonNull(name, "name");
27+
return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name()));
28+
}
29+
30+
/** {@inheritDoc} */
31+
@Override
32+
public synchronized QueryProfile finish() {
33+
if (finished) {
34+
return profile;
35+
}
36+
finished = true;
37+
long endNanos = System.nanoTime();
38+
Map<MetricName, Double> snapshot = new LinkedHashMap<>(MetricName.values().length);
39+
for (MetricName metricName : MetricName.values()) {
40+
DefaultMetricImpl metric = metrics.get(metricName);
41+
double millis = metric == null ? 0d : roundToMillis(metric.value());
42+
snapshot.put(metricName, millis);
43+
}
44+
double totalMillis = roundToMillis(endNanos - startNanos);
45+
profile = new QueryProfile(totalMillis, snapshot);
46+
return profile;
47+
}
48+
49+
private double roundToMillis(long nanos) {
50+
return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d;
51+
}
52+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** Named metrics used by query profiling. */
9+
public enum MetricName {
10+
ANALYZE,
11+
OPTIMIZE,
12+
EXECUTE,
13+
FORMAT
14+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
import java.util.Objects;
9+
10+
/** Disabled profiling context. */
11+
public final class NoopProfileContext implements ProfileContext {
12+
13+
public static final NoopProfileContext INSTANCE = new NoopProfileContext();
14+
15+
private NoopProfileContext() {}
16+
17+
/** {@inheritDoc} */
18+
@Override
19+
public ProfileMetric getOrCreateMetric(MetricName name) {
20+
Objects.requireNonNull(name, "name");
21+
return NoopProfileMetric.INSTANCE;
22+
}
23+
24+
/** {@inheritDoc} */
25+
@Override
26+
public QueryProfile finish() {
27+
return null;
28+
}
29+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** No-op metric implementation. */
9+
final class NoopProfileMetric implements ProfileMetric {
10+
11+
static final NoopProfileMetric INSTANCE = new NoopProfileMetric();
12+
13+
private NoopProfileMetric() {}
14+
15+
/** {@inheritDoc} */
16+
@Override
17+
public String name() {
18+
return "";
19+
}
20+
21+
/** {@inheritDoc} */
22+
@Override
23+
public long value() {
24+
return 0;
25+
}
26+
27+
/** {@inheritDoc} */
28+
@Override
29+
public void add(long delta) {}
30+
31+
/** {@inheritDoc} */
32+
@Override
33+
public void set(long value) {}
34+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** Context for collecting profiling metrics during query execution. */
9+
public interface ProfileContext {
10+
/**
11+
* Obtain or create a metric with the provided name.
12+
*
13+
* @param name fully qualified metric name
14+
* @return metric instance
15+
*/
16+
ProfileMetric getOrCreateMetric(MetricName name);
17+
18+
/**
19+
* Finalize profiling and return a snapshot.
20+
*
21+
* @return immutable query profile snapshot
22+
*/
23+
QueryProfile finish();
24+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** Metric for query profiling. */
9+
public interface ProfileMetric {
10+
/**
11+
* @return metric name.
12+
*/
13+
String name();
14+
15+
/**
16+
* @return current metric value.
17+
*/
18+
long value();
19+
20+
/**
21+
* Increment the metric by the given delta.
22+
*
23+
* @param delta amount to add
24+
*/
25+
void add(long delta);
26+
27+
/**
28+
* Set the metric to the provided value.
29+
*
30+
* @param value new metric value
31+
*/
32+
void set(long value);
33+
}

0 commit comments

Comments
 (0)