Skip to content

Commit 022461a

Browse files
committed
Use context parser for index extraction instead of standalone PPLQueryParser
Create UnifiedQueryContext upfront in isAnalyticsIndex() and use context.getParser() for index name extraction. This reuses the context-owned parser which supports both PPL and SQL, making it ready for unified SQL support without code changes. Remove standalone PPLQueryParser field and Settings constructor param. isAnalyticsIndex() now takes QueryType to create the right context. extractIndexName() handles UnresolvedPlan (PPL) with a TODO for SqlNode (SQL) when unified SQL is enabled. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent b0ecfc2 commit 022461a

3 files changed

Lines changed: 73 additions & 86 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 62 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
import org.opensearch.core.action.ActionListener;
2121
import org.opensearch.sql.api.UnifiedQueryContext;
2222
import org.opensearch.sql.api.UnifiedQueryPlanner;
23-
import org.opensearch.sql.api.parser.PPLQueryParser;
2423
import org.opensearch.sql.ast.tree.Relation;
2524
import org.opensearch.sql.ast.tree.UnresolvedPlan;
2625
import org.opensearch.sql.calcite.CalcitePlanContext;
2726
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
2827
import org.opensearch.sql.common.response.ResponseListener;
29-
import org.opensearch.sql.common.setting.Settings;
3028
import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
3129
import org.opensearch.sql.executor.QueryType;
3230
import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine;
@@ -41,9 +39,9 @@
4139
import org.opensearch.transport.client.node.NodeClient;
4240

4341
/**
44-
* Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL queries
45-
* using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates to {@link
46-
* AnalyticsExecutionEngine} for execution.
42+
* Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL/SQL
43+
* queries using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates
44+
* to {@link AnalyticsExecutionEngine} for execution.
4745
*/
4846
public class RestUnifiedQueryAction {
4947

@@ -52,25 +50,22 @@ public class RestUnifiedQueryAction {
5250

5351
private final AnalyticsExecutionEngine analyticsEngine;
5452
private final NodeClient client;
55-
private final PPLQueryParser pplParser;
5653

57-
public RestUnifiedQueryAction(
58-
NodeClient client, QueryPlanExecutor planExecutor, Settings settings) {
54+
public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) {
5955
this.client = client;
6056
this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor);
61-
this.pplParser = new PPLQueryParser(settings);
6257
}
6358

6459
/**
65-
* Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses {@link
66-
* PPLQueryParser} to parse the query and extract the index name from the AST.
60+
* Check if the query targets an analytics engine index (e.g., Parquet-backed). Creates a {@link
61+
* UnifiedQueryContext} to use its parser for index name extraction, supporting both PPL and SQL.
6762
*/
68-
public boolean isAnalyticsIndex(String query) {
63+
public boolean isAnalyticsIndex(String query, QueryType queryType) {
6964
if (query == null || query.isEmpty()) {
7065
return false;
7166
}
72-
try {
73-
String indexName = extractIndexName(query);
67+
try (UnifiedQueryContext context = buildContext(queryType, false)) {
68+
String indexName = extractIndexName(query, context);
7469
if (indexName == null) {
7570
return false;
7671
}
@@ -82,29 +77,6 @@ public boolean isAnalyticsIndex(String query) {
8277
}
8378
}
8479

85-
/** Extract the source index name by parsing the PPL AST and finding the Relation node. */
86-
private String extractIndexName(String query) {
87-
UnresolvedPlan plan = pplParser.parse(query);
88-
Relation relation = findRelation(plan);
89-
return relation != null ? relation.getTableQualifiedName().toString() : null;
90-
}
91-
92-
/** Walk the AST to find the Relation (table scan) node. */
93-
private static Relation findRelation(UnresolvedPlan plan) {
94-
if (plan instanceof Relation) {
95-
return (Relation) plan;
96-
}
97-
for (var child : plan.getChild()) {
98-
if (child instanceof UnresolvedPlan unresolvedChild) {
99-
Relation found = findRelation(unresolvedChild);
100-
if (found != null) {
101-
return found;
102-
}
103-
}
104-
}
105-
return null;
106-
}
107-
10880
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
10981
public void execute(
11082
String query,
@@ -142,25 +114,14 @@ private void doExecute(
142114
QueryType queryType,
143115
PPLQueryRequest pplRequest,
144116
ActionListener<TransportPPLQueryResponse> listener) {
145-
try {
146-
AbstractSchema schema = StubSchemaProvider.buildSchema();
117+
try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) {
118+
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
119+
RelNode plan = planner.plan(query);
147120

148-
try (UnifiedQueryContext context =
149-
UnifiedQueryContext.builder()
150-
.language(queryType)
151-
.catalog(SCHEMA_NAME, schema)
152-
.defaultNamespace(SCHEMA_NAME)
153-
.profiling(pplRequest.profile())
154-
.build()) {
121+
CalcitePlanContext planContext = context.getPlanContext();
122+
plan = addQuerySizeLimit(plan, planContext);
155123

156-
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
157-
RelNode plan = planner.plan(query);
158-
159-
CalcitePlanContext planContext = context.getPlanContext();
160-
plan = addQuerySizeLimit(plan, planContext);
161-
162-
analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener));
163-
}
124+
analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener));
164125
} catch (Exception e) {
165126
listener.onFailure(e);
166127
}
@@ -171,28 +132,59 @@ private void doExplain(
171132
QueryType queryType,
172133
PPLQueryRequest pplRequest,
173134
ResponseListener<ExplainResponse> listener) {
174-
try {
175-
AbstractSchema schema = StubSchemaProvider.buildSchema();
135+
try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) {
136+
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
137+
RelNode plan = planner.plan(query);
176138

177-
try (UnifiedQueryContext context =
178-
UnifiedQueryContext.builder()
179-
.language(queryType)
180-
.catalog(SCHEMA_NAME, schema)
181-
.defaultNamespace(SCHEMA_NAME)
182-
.profiling(pplRequest.profile())
183-
.build()) {
139+
CalcitePlanContext planContext = context.getPlanContext();
140+
plan = addQuerySizeLimit(plan, planContext);
141+
142+
analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener);
143+
} catch (Exception e) {
144+
listener.onFailure(e);
145+
}
146+
}
184147

185-
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
186-
RelNode plan = planner.plan(query);
148+
private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) {
149+
AbstractSchema schema = StubSchemaProvider.buildSchema();
150+
return UnifiedQueryContext.builder()
151+
.language(queryType)
152+
.catalog(SCHEMA_NAME, schema)
153+
.defaultNamespace(SCHEMA_NAME)
154+
.profiling(profiling)
155+
.build();
156+
}
187157

188-
CalcitePlanContext planContext = context.getPlanContext();
189-
plan = addQuerySizeLimit(plan, planContext);
158+
/**
159+
* Extract the source index name by parsing the query using the context's parser and finding the
160+
* Relation node in the AST. Works for both PPL and SQL via {@link
161+
* UnifiedQueryContext#getParser()}.
162+
*/
163+
@SuppressWarnings("unchecked")
164+
private static String extractIndexName(String query, UnifiedQueryContext context) {
165+
Object parseResult = context.getParser().parse(query);
166+
if (parseResult instanceof UnresolvedPlan unresolvedPlan) {
167+
Relation relation = findRelation(unresolvedPlan);
168+
return relation != null ? relation.getTableQualifiedName().toString() : null;
169+
}
170+
// TODO: handle SQL SqlNode for table extraction when unified SQL is enabled
171+
return null;
172+
}
190173

191-
analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener);
174+
/** Walk the AST to find the Relation (table scan) node. */
175+
private static Relation findRelation(UnresolvedPlan plan) {
176+
if (plan instanceof Relation) {
177+
return (Relation) plan;
178+
}
179+
for (var child : plan.getChild()) {
180+
if (child instanceof UnresolvedPlan unresolvedChild) {
181+
Relation found = findRelation(unresolvedChild);
182+
if (found != null) {
183+
return found;
184+
}
192185
}
193-
} catch (Exception e) {
194-
listener.onFailure(e);
195186
}
187+
return null;
196188
}
197189

198190
private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) {

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,7 @@ public TransportPPLQueryAction(
8383
b.bind(DataSourceService.class).toInstance(dataSourceService);
8484
});
8585
this.injector = Guice.createInjector(modules);
86-
this.unifiedQueryHandler =
87-
new RestUnifiedQueryAction(
88-
client,
89-
new StubQueryPlanExecutor(),
90-
new OpenSearchSettings(clusterService.getClusterSettings()));
86+
this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor());
9187
this.pplEnabled =
9288
() ->
9389
MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings)
@@ -134,7 +130,7 @@ protected void doExecute(
134130
ActionListener<TransportPPLQueryResponse> clearingListener = wrapWithProfilingClear(listener);
135131

136132
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices
137-
if (unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest())) {
133+
if (unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest(), QueryType.PPL)) {
138134
if (transformedRequest.isExplainRequest()) {
139135
unifiedQueryHandler.explain(
140136
transformedRequest.getRequest(),

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
import org.junit.Before;
1313
import org.junit.Test;
14-
import org.opensearch.sql.common.setting.Settings;
14+
import org.opensearch.sql.executor.QueryType;
1515
import org.opensearch.sql.executor.analytics.QueryPlanExecutor;
1616
import org.opensearch.transport.client.node.NodeClient;
1717

1818
/**
19-
* Tests for analytics index routing in RestUnifiedQueryAction. Uses PPLQueryParser for AST-based
19+
* Tests for analytics index routing in RestUnifiedQueryAction. Uses context parser for AST-based
2020
* index name extraction.
2121
*/
2222
public class RestUnifiedQueryActionTest {
@@ -25,21 +25,20 @@ public class RestUnifiedQueryActionTest {
2525

2626
@Before
2727
public void setUp() {
28-
action =
29-
new RestUnifiedQueryAction(
30-
mock(NodeClient.class), mock(QueryPlanExecutor.class), mock(Settings.class));
28+
action = new RestUnifiedQueryAction(mock(NodeClient.class), mock(QueryPlanExecutor.class));
3129
}
3230

3331
@Test
3432
public void parquetIndexRoutesToAnalytics() {
35-
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts"));
36-
assertTrue(action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts"));
33+
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
34+
assertTrue(
35+
action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL));
3736
}
3837

3938
@Test
4039
public void nonParquetIndexRoutesToLucene() {
41-
assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts"));
42-
assertFalse(action.isAnalyticsIndex(null));
43-
assertFalse(action.isAnalyticsIndex(""));
40+
assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts", QueryType.PPL));
41+
assertFalse(action.isAnalyticsIndex(null, QueryType.PPL));
42+
assertFalse(action.isAnalyticsIndex("", QueryType.PPL));
4443
}
4544
}

0 commit comments

Comments
 (0)