Skip to content

Commit 1a83941

Browse files
Add full cursor serialization and deserialization.
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
1 parent 9db17c3 commit 1a83941

4 files changed

Lines changed: 77 additions & 19 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.IOException;
99
import java.io.ObjectInputStream;
10+
import java.util.ArrayList;
1011
import java.util.List;
1112
import java.util.stream.Collectors;
1213
import java.util.stream.Stream;
@@ -15,6 +16,7 @@
1516
import org.opensearch.sql.ast.tree.UnresolvedPlan;
1617
import org.opensearch.sql.expression.NamedExpression;
1718
import org.opensearch.sql.expression.ReferenceExpression;
19+
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
1820
import org.opensearch.sql.opensearch.executor.Cursor;
1921
import org.opensearch.sql.planner.PaginateOperator;
2022
import org.opensearch.sql.planner.physical.PhysicalPlan;
@@ -58,19 +60,69 @@ public Cursor convertToCursor(PhysicalPlan plan) {
5860
*/
5961
public PhysicalPlan convertToPlan(String cursor) {
6062
if (cursor.startsWith(CURSOR_PREFIX)) {
61-
String expression = cursor.substring(CURSOR_PREFIX.length());
63+
try {
64+
String expression = cursor.substring(CURSOR_PREFIX.length());
6265

63-
// TODO Parse expression and initialize variables below.
64-
// storageEngine needs to create the TableScanOperator.
65-
int pageSize = -1;
66-
int currentPageIndex = -1;
67-
List<NamedExpression> projectList = List.of();
68-
String scanAsString = "";
69-
TableScanOperator scan = storageEngine.getTableScan(scanAsString);
66+
// TODO Parse expression and initialize variables below.
67+
// storageEngine needs to create the TableScanOperator.
7068

71-
return new PaginateOperator(new ProjectOperator(scan, projectList, List.of()),
72-
pageSize, currentPageIndex);
69+
// TODO Parse with ANTLR or serialize as JSON/XML
70+
if (!expression.startsWith("(Paginate,")) {
71+
throw new UnsupportedOperationException("Unsupported cursor");
72+
}
73+
expression = expression.substring(expression.indexOf(',') + 1);
74+
int currentPageIndex = Integer.parseInt(expression, 0, expression.indexOf(','), 10);
7375

76+
expression = expression.substring(expression.indexOf(',') + 1);
77+
int pageSize = Integer.parseInt(expression, 0, expression.indexOf(','), 10);
78+
79+
expression = expression.substring(expression.indexOf(',') + 1);
80+
if (!expression.startsWith("(Project,")) {
81+
throw new UnsupportedOperationException("Unsupported cursor");
82+
}
83+
expression = expression.substring(expression.indexOf(',') + 1);
84+
if (!expression.startsWith("(namedParseExpressions,")) {
85+
throw new UnsupportedOperationException("Unsupported cursor");
86+
}
87+
expression = expression.substring(expression.indexOf(',') + 1);
88+
var serializer = new DefaultExpressionSerializer();
89+
// TODO parse npe
90+
List<NamedExpression> namedParseExpressions = List.of();
91+
92+
expression = expression.substring(expression.indexOf(',') + 1);
93+
List<NamedExpression> projectList = new ArrayList<>();
94+
if (!expression.startsWith("(projectList,")) {
95+
throw new UnsupportedOperationException("Unsupported cursor");
96+
}
97+
expression = expression.substring(expression.indexOf(',') + 1);
98+
while (expression.startsWith("(named,")) {
99+
expression = expression.substring(expression.indexOf(',') + 1);
100+
var name = expression.substring(0, expression.indexOf(','));
101+
expression = expression.substring(expression.indexOf(',') + 1);
102+
var alias = expression.substring(0, expression.indexOf(','));
103+
if (alias.isEmpty()) {
104+
alias = null;
105+
}
106+
expression = expression.substring(expression.indexOf(',') + 1);
107+
projectList.add(new NamedExpression(name,
108+
serializer.deserialize(expression.substring(0, expression.indexOf(')'))), alias));
109+
expression = expression.substring(expression.indexOf(',') + 1);
110+
}
111+
112+
if (!expression.startsWith("(OpenSearchPagedIndexScan,")) {
113+
throw new UnsupportedOperationException("Unsupported cursor");
114+
}
115+
expression = expression.substring(expression.indexOf(',') + 1);
116+
var indexName = expression.substring(0, expression.indexOf(','));
117+
expression = expression.substring(expression.indexOf(',') + 1);
118+
var scrollId = expression.substring(0, expression.indexOf(')'));
119+
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);
120+
121+
return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
122+
pageSize, currentPageIndex);
123+
} catch (Exception e) {
124+
throw new UnsupportedOperationException("Unsupported cursor", e);
125+
}
74126
} else {
75127
throw new UnsupportedOperationException("Unsupported cursor");
76128
}

core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.sql.executor.ExecutionEngine;
2323
import org.opensearch.sql.expression.NamedExpression;
2424
import org.opensearch.sql.expression.parse.ParseExpression;
25+
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
2526

2627
/**
2728
* Project the fields specified in {@link ProjectOperator#projectList} from input.
@@ -98,9 +99,17 @@ public ExecutionEngine.Schema schema() {
9899
@Override
99100
public String toCursor() {
100101
String child = getChild().get(0).toCursor();
101-
String namedExpressions = "TODO";
102-
// TODO serialize named expressions.
103-
// Skipping parsedExpressions for now.
104-
return createSection("Project", namedExpressions, child);
102+
var serializer = new DefaultExpressionSerializer();
103+
String projects = createSection("projectList",
104+
projectList.stream().map(ne -> createSection("named",
105+
ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated())
106+
))
107+
.toArray(String[]::new));
108+
String namedExpressions = createSection("namedParseExpressions",
109+
namedParseExpressions.stream().map(ne -> createSection("named",
110+
ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated())
111+
))
112+
.toArray(String[]::new));
113+
return createSection("Project", namedExpressions, projects, child);
105114
}
106115
}

core/src/main/java/org/opensearch/sql/storage/StorageEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ default Collection<FunctionResolver> getFunctions() {
3232
return Collections.emptyList();
3333
}
3434

35-
default TableScanOperator getTableScan(String scanAsString) {
35+
default TableScanOperator getTableScan(String indexName, String scrollId) {
3636
String error = String.format("%s.getTableScan needs to be implemented", getClass());
3737
throw new UnsupportedOperationException(error);
3838
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@ public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) {
3838
}
3939

4040
@Override
41-
public TableScanOperator getTableScan(String scanAsString) {
42-
// TODO extract indexName and scrollId from scanAsString
43-
String indexName ="";
44-
String scrollId = "";
41+
public TableScanOperator getTableScan(String indexName, String scrollId) {
4542
var index = new OpenSearchIndex(client, settings, indexName);
4643
var requestBuilder = new SubsequentPageRequestBuilder(
4744
new OpenSearchRequest.IndexName(indexName),

0 commit comments

Comments
 (0)