Skip to content

Commit ba3efd7

Browse files
committed
Fix EXECUTE profiling metric by recording inside AnalyticsExecutionEngine
Move EXECUTE metric recording into AnalyticsExecutionEngine.execute(), between the actual execution (planExecutor + row conversion) and the listener.onResponse() call. This ensures the metric is written before SimpleJsonResponseFormatter calls QueryProfiling.finish() to snapshot. Previously context.measure() was used in RestUnifiedQueryAction, but finish() was called inside the listener callback (synchronously) before measure()'s finally block could write the metric, resulting in 0ms. Add IT assertion that execute phase time_ms > 0 to catch this bug. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent ce4f6c5 commit ba3efd7

3 files changed

Lines changed: 20 additions & 16 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.opensearch.sql.executor.ExecutionContext;
2525
import org.opensearch.sql.executor.ExecutionEngine;
2626
import org.opensearch.sql.executor.pagination.Cursor;
27+
import org.opensearch.sql.monitor.profile.MetricName;
28+
import org.opensearch.sql.monitor.profile.ProfileMetric;
29+
import org.opensearch.sql.monitor.profile.QueryProfiling;
2730
import org.opensearch.sql.planner.physical.PhysicalPlan;
2831

2932
/**
@@ -67,12 +70,20 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
6770
public void execute(
6871
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
6972
try {
73+
// Record EXECUTE metric before calling listener, because the listener's onResponse
74+
// triggers SimpleJsonResponseFormatter which calls QueryProfiling.finish() to snapshot
75+
// all metrics. The metric must be written before that snapshot.
76+
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
77+
long execStart = System.nanoTime();
78+
7079
Iterable<Object[]> rows = planExecutor.execute(plan, null);
7180

7281
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
7382
List<ExprValue> results = convertRows(rows, fields);
7483
Schema schema = buildSchema(fields);
7584

85+
execMetric.set(System.nanoTime() - execStart);
86+
7687
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
7788
} catch (Exception e) {
7889
listener.onFailure(e);

integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ public void testProfileResponseIncludesProfilingData() throws IOException {
131131
JSONObject result = new JSONObject(getResponseBody(response, true));
132132

133133
assertTrue("Response should have 'profile' field when profile=true", result.has("profile"));
134+
JSONObject profile = result.getJSONObject("profile");
135+
assertTrue("Profile should have 'phases' field", profile.has("phases"));
136+
JSONObject phases = profile.getJSONObject("phases");
137+
assertTrue("Phases should have 'execute' field", phases.has("execute"));
138+
double executeTime = phases.getJSONObject("execute").getDouble("time_ms");
139+
assertTrue(
140+
"Execute phase should have non-zero time, got " + executeTime + "ms", executeTime > 0);
134141
}
135142

136143
// --- Error handling tests ---

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine;
3232
import org.opensearch.sql.executor.analytics.QueryPlanExecutor;
3333
import org.opensearch.sql.lang.LangSpec;
34-
import org.opensearch.sql.monitor.profile.MetricName;
3534
import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider;
3635
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
3736
import org.opensearch.sql.ppl.domain.PPLQueryRequest;
@@ -123,14 +122,7 @@ private void doExecute(
123122
CalcitePlanContext planContext = context.getPlanContext();
124123
plan = addQuerySizeLimit(plan, planContext);
125124

126-
RelNode finalPlan = plan;
127-
context.measure(
128-
MetricName.EXECUTE,
129-
() -> {
130-
analyticsEngine.execute(
131-
finalPlan, planContext, createQueryListener(queryType, listener));
132-
return null;
133-
});
125+
analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener));
134126
} catch (Exception e) {
135127
listener.onFailure(e);
136128
}
@@ -148,13 +140,7 @@ private void doExplain(
148140
CalcitePlanContext planContext = context.getPlanContext();
149141
plan = addQuerySizeLimit(plan, planContext);
150142

151-
RelNode finalPlan = plan;
152-
context.measure(
153-
MetricName.EXECUTE,
154-
() -> {
155-
analyticsEngine.explain(finalPlan, pplRequest.mode(), planContext, listener);
156-
return null;
157-
});
143+
analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener);
158144
} catch (Exception e) {
159145
listener.onFailure(e);
160146
}

0 commit comments

Comments
 (0)