|
7 | 7 |
|
8 | 8 | import java.io.IOException; |
9 | 9 | import java.io.ObjectInputStream; |
| 10 | +import java.util.ArrayList; |
10 | 11 | import java.util.List; |
11 | 12 | import java.util.stream.Collectors; |
12 | 13 | import java.util.stream.Stream; |
|
15 | 16 | import org.opensearch.sql.ast.tree.UnresolvedPlan; |
16 | 17 | import org.opensearch.sql.expression.NamedExpression; |
17 | 18 | import org.opensearch.sql.expression.ReferenceExpression; |
| 19 | +import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer; |
18 | 20 | import org.opensearch.sql.opensearch.executor.Cursor; |
19 | 21 | import org.opensearch.sql.planner.PaginateOperator; |
20 | 22 | import org.opensearch.sql.planner.physical.PhysicalPlan; |
@@ -58,19 +60,69 @@ public Cursor convertToCursor(PhysicalPlan plan) { |
58 | 60 | */ |
59 | 61 | public PhysicalPlan convertToPlan(String cursor) { |
60 | 62 | if (cursor.startsWith(CURSOR_PREFIX)) { |
61 | | - String expression = cursor.substring(CURSOR_PREFIX.length()); |
| 63 | + try { |
| 64 | + String expression = cursor.substring(CURSOR_PREFIX.length()); |
62 | 65 |
|
63 | | - // TODO Parse expression and initialize variables below. |
64 | | - // storageEngine needs to create the TableScanOperator. |
65 | | - int pageSize = -1; |
66 | | - int currentPageIndex = -1; |
67 | | - List<NamedExpression> projectList = List.of(); |
68 | | - String scanAsString = ""; |
69 | | - TableScanOperator scan = storageEngine.getTableScan(scanAsString); |
| 66 | + // TODO Parse expression and initialize variables below. |
| 67 | + // storageEngine needs to create the TableScanOperator. |
70 | 68 |
|
71 | | - return new PaginateOperator(new ProjectOperator(scan, projectList, List.of()), |
72 | | - pageSize, currentPageIndex); |
| 69 | + // TODO Parse with ANTLR or serialize as JSON/XML |
| 70 | + if (!expression.startsWith("(Paginate,")) { |
| 71 | + throw new UnsupportedOperationException("Unsupported cursor"); |
| 72 | + } |
| 73 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 74 | + int currentPageIndex = Integer.parseInt(expression, 0, expression.indexOf(','), 10); |
73 | 75 |
|
| 76 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 77 | + int pageSize = Integer.parseInt(expression, 0, expression.indexOf(','), 10); |
| 78 | + |
| 79 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 80 | + if (!expression.startsWith("(Project,")) { |
| 81 | + throw new UnsupportedOperationException("Unsupported cursor"); |
| 82 | + } |
| 83 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 84 | + if (!expression.startsWith("(namedParseExpressions,")) { |
| 85 | + throw new UnsupportedOperationException("Unsupported cursor"); |
| 86 | + } |
| 87 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 88 | + var serializer = new DefaultExpressionSerializer(); |
| 89 | + // TODO parse npe |
| 90 | + List<NamedExpression> namedParseExpressions = List.of(); |
| 91 | + |
| 92 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 93 | + List<NamedExpression> projectList = new ArrayList<>(); |
| 94 | + if (!expression.startsWith("(projectList,")) { |
| 95 | + throw new UnsupportedOperationException("Unsupported cursor"); |
| 96 | + } |
| 97 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 98 | + while (expression.startsWith("(named,")) { |
| 99 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 100 | + var name = expression.substring(0, expression.indexOf(',')); |
| 101 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 102 | + var alias = expression.substring(0, expression.indexOf(',')); |
| 103 | + if (alias.isEmpty()) { |
| 104 | + alias = null; |
| 105 | + } |
| 106 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 107 | + projectList.add(new NamedExpression(name, |
| 108 | + serializer.deserialize(expression.substring(0, expression.indexOf(')'))), alias)); |
| 109 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 110 | + } |
| 111 | + |
| 112 | + if (!expression.startsWith("(OpenSearchPagedIndexScan,")) { |
| 113 | + throw new UnsupportedOperationException("Unsupported cursor"); |
| 114 | + } |
| 115 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 116 | + var indexName = expression.substring(0, expression.indexOf(',')); |
| 117 | + expression = expression.substring(expression.indexOf(',') + 1); |
| 118 | + var scrollId = expression.substring(0, expression.indexOf(')')); |
| 119 | + TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId); |
| 120 | + |
| 121 | + return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions), |
| 122 | + pageSize, currentPageIndex); |
| 123 | + } catch (Exception e) { |
| 124 | + throw new UnsupportedOperationException("Unsupported cursor", e); |
| 125 | + } |
74 | 126 | } else { |
75 | 127 | throw new UnsupportedOperationException("Unsupported cursor"); |
76 | 128 | } |
|
0 commit comments