Skip to content

Commit 8d531e0

Browse files
authored
Support parsing documents with flattened value (opensearch-project#3577)
* Support parsing documents with flattened value Signed-off-by: Heng Qian <qianheng@amazon.com> * Add UT and yamlTest Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine code Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine code Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine code Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix UT by using LinkedHashMap to ensure order Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix UT Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix log level for JdbcOpenSearchDataTypeConvertor Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 47e6a2a commit 8d531e0

12 files changed

Lines changed: 293 additions & 6 deletions

File tree

core/src/main/java/org/opensearch/sql/data/model/ExprTupleValue.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public static ExprTupleValue fromExprValueMap(Map<String, ExprValue> map) {
2828
return new ExprTupleValue(linkedHashMap);
2929
}
3030

31+
public static ExprTupleValue empty() {
32+
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
33+
return new ExprTupleValue(linkedHashMap);
34+
}
35+
3136
@Override
3237
public Object value() {
3338
LinkedHashMap<String, Object> resultMap = new LinkedHashMap<>();
@@ -107,4 +112,17 @@ public int compare(ExprValue other) {
107112
public int hashCode() {
108113
return Objects.hashCode(valueMap);
109114
}
115+
116+
/** Implements mergeTo by merging deeply */
117+
@Override
118+
public ExprTupleValue mergeTo(ExprValue base) {
119+
if (base instanceof ExprTupleValue) {
120+
base.tupleValue()
121+
.forEach((key, value) -> this.tupleValue().merge(key, value, ExprValue::mergeTo));
122+
} else {
123+
throw new IllegalArgumentException(
124+
String.format("Cannot merge ExprTupleValue to %s", base.getClass().getSimpleName()));
125+
}
126+
return this;
127+
}
110128
}

core/src/main/java/org/opensearch/sql/data/model/ExprValue.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,17 @@ default List<ExprValue> collectionValue() {
166166
default ExprValue keyValue(String key) {
167167
return ExprMissingValue.of();
168168
}
169+
170+
/**
171+
* Merge the value to the base value. By default, it overrides the base value with the current
172+
*
173+
* <p>This method will be called when key conflict happens in the process of populating
174+
* ExprTupleValue See {@link OpenSearchExprValueFactory::populateValueRecursive}.
175+
*
176+
* @param base the target value to merge
177+
* @return The merged value
178+
*/
179+
default ExprValue mergeTo(ExprValue base) {
180+
return this;
181+
}
169182
}

core/src/test/java/org/opensearch/sql/data/model/ExprTupleValueTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,21 @@ public void comparabilityTest() {
5555
assertThrows(ExpressionEvaluationException.class, () -> compare(tupleValue, tupleValue));
5656
assertEquals("ExprTupleValue instances are not comparable", exception.getMessage());
5757
}
58+
59+
@Test
60+
public void testMergeTo() {
61+
ExprValue tupleValue1 =
62+
ExprValueUtils.tupleValue(
63+
ImmutableMap.of("v1", 1, "inner_tuple", ImmutableMap.of("inner_v1", 1)));
64+
ExprValue tupleValue2 =
65+
ExprValueUtils.tupleValue(
66+
ImmutableMap.of("v2", 2, "inner_tuple", ImmutableMap.of("inner_v2", 2)));
67+
ExprValue expectedMergedValue =
68+
ExprValueUtils.tupleValue(
69+
ImmutableMap.of(
70+
"v1", 1,
71+
"inner_tuple", ImmutableMap.of("inner_v1", 1, "inner_v2", 2),
72+
"v2", 2));
73+
assertEquals(expectedMergedValue, tupleValue1.mergeTo(tupleValue2));
74+
}
5875
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.remote;
7+
8+
import org.opensearch.sql.ppl.FlattenDocValueIT;
9+
10+
public class CalciteFlattenDocValueIT extends FlattenDocValueIT {
11+
@Override
12+
public void init() throws Exception {
13+
super.init();
14+
enableCalcite();
15+
disallowCalciteFallback();
16+
}
17+
}

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,11 @@ public enum Index {
838838
"alias",
839839
getAliasIndexMapping(),
840840
"src/test/resources/alias.json"),
841+
FLATTENED_VALUE(
842+
TestsConstants.TEST_INDEX_FLATTENED_VALUE,
843+
"flattened_value",
844+
null,
845+
"src/test/resources/flattened_value.json"),
841846
DUPLICATION_NULLABLE(
842847
TestsConstants.TEST_INDEX_DUPLICATION_NULLABLE,
843848
"duplication_nullable",

integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class TestsConstants {
6262
public static final String TEST_INDEX_GEOPOINT = TEST_INDEX + "_geopoint";
6363
public static final String TEST_INDEX_JSON_TEST = TEST_INDEX + "_json_test";
6464
public static final String TEST_INDEX_ALIAS = TEST_INDEX + "_alias";
65+
public static final String TEST_INDEX_FLATTENED_VALUE = TEST_INDEX + "_flattened_value";
6566
public static final String TEST_INDEX_GEOIP = TEST_INDEX + "_geoip";
6667
public static final String DATASOURCES = ".ql-datasources";
6768
public static final String TEST_INDEX_STATE_COUNTRY = TEST_INDEX + "_state_country";
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ppl;
7+
8+
import static org.opensearch.sql.legacy.SQLIntegTestCase.Index.FLATTENED_VALUE;
9+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_FLATTENED_VALUE;
10+
import static org.opensearch.sql.util.MatcherUtils.rows;
11+
import static org.opensearch.sql.util.MatcherUtils.schema;
12+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
13+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
14+
15+
import java.io.IOException;
16+
import org.hamcrest.TypeSafeMatcher;
17+
import org.json.JSONArray;
18+
import org.json.JSONObject;
19+
import org.junit.jupiter.api.Test;
20+
21+
public class FlattenDocValueIT extends PPLIntegTestCase {
22+
@Override
23+
public void init() throws Exception {
24+
super.init();
25+
loadIndex(FLATTENED_VALUE);
26+
}
27+
28+
@Test
29+
public void testFlattenDocValue() throws IOException {
30+
JSONObject result = executeQuery(String.format("source=%s", TEST_INDEX_FLATTENED_VALUE));
31+
verifySchema(result, schema("log", "struct"));
32+
TypeSafeMatcher<JSONArray> expectedRow =
33+
rows(new JSONObject("{ \"json\" : { \"status\": \"SUCCESS\", \"time\": 100} }"));
34+
verifyDataRows(result, expectedRow, expectedRow, expectedRow, expectedRow, expectedRow);
35+
}
36+
37+
@Test
38+
public void testFlattenDocValueWithFields() throws IOException {
39+
JSONObject result =
40+
executeQuery(
41+
String.format(
42+
"source=%s | fields log, log.json, log.json.status, log.json.time",
43+
TEST_INDEX_FLATTENED_VALUE));
44+
verifySchema(
45+
result,
46+
schema("log", "struct"),
47+
schema("log.json", "struct"),
48+
schema("log.json.status", "string"),
49+
schema("log.json.time", "bigint"));
50+
TypeSafeMatcher<JSONArray> expectedRow =
51+
rows(
52+
new JSONObject("{ \"json\" : { \"status\": \"SUCCESS\", \"time\": 100} }"),
53+
new JSONObject("{ \"status\": \"SUCCESS\", \"time\": 100}"),
54+
"SUCCESS",
55+
100);
56+
verifyDataRows(result, expectedRow, expectedRow, expectedRow, expectedRow, expectedRow);
57+
}
58+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{"index": {"_id":"1"}}
2+
{"log": { "json" : { "status": "SUCCESS", "time": 100} } }
3+
{"index": {"_id":"2"}}
4+
{"log.json": { "status": "SUCCESS", "time": 100} }
5+
{"index": {"_id":"3"}}
6+
{"log.json.status": "SUCCESS", "log.json.time": 100 }
7+
{"index": {"_id":"4"}}
8+
{"log.json": { "status": "SUCCESS" }, "log.json.time": 100 }
9+
{"index": {"_id":"5"}}
10+
{"log": { "json" : {} }, "log.json": { "status": "SUCCESS" }, "log.json.time": 100 }
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled : true
7+
plugins.calcite.fallback.allowed : false
8+
9+
---
10+
teardown:
11+
- do:
12+
query.settings:
13+
body:
14+
transient:
15+
plugins.calcite.enabled : false
16+
plugins.calcite.fallback.allowed : true
17+
18+
---
19+
"Handle flattened document value":
20+
- skip:
21+
features:
22+
- headers
23+
- do:
24+
bulk:
25+
index: test
26+
refresh: true
27+
body:
28+
- '{"index": {}}'
29+
- '{"log": { "json" : { "status": "SUCCESS", "time": 100} } }'
30+
- '{"index": {}}'
31+
- '{"log.json": { "status": "SUCCESS", "time": 100} }'
32+
- '{"index": {}}'
33+
- '{"log.json.status": "SUCCESS", "log.json.time": 100 }'
34+
- '{"index": {}}'
35+
- '{"log.json": { "status": "SUCCESS" }, "log.json.time": 100 }'
36+
- '{"index": {}}'
37+
- '{"log": { "json" : {} }, "log.json": { "status": "SUCCESS" }, "log.json.time": 100 }'
38+
- do:
39+
headers:
40+
Content-Type: 'application/json'
41+
ppl:
42+
body:
43+
query: 'source=test'
44+
- match: {"total": 5}
45+
- match: {"schema": [{"name": "log", "type": "struct"}]}
46+
- match: {"datarows": [[{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }]]}
47+
48+
- do:
49+
headers:
50+
Content-Type: 'application/json'
51+
ppl:
52+
body:
53+
query: 'source=test | fields log, log.json, log.json.status, log.json.time'
54+
- match: {"total": 5}
55+
- match: {"schema": [{"name": "log", "type": "struct"}, {"name": "log.json", "type": "struct"}, {"name": "log.json.status", "type": "string"}, {"name": "log.json.time", "type": "bigint"}]}
56+
- match: {"datarows": [[{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100]]}

opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.time.format.DateTimeParseException;
3333
import java.time.temporal.TemporalAccessor;
3434
import java.util.ArrayList;
35-
import java.util.LinkedHashMap;
3635
import java.util.List;
3736
import java.util.Map;
3837
import java.util.Optional;
@@ -317,19 +316,63 @@ private static ExprValue createOpenSearchDateType(Content value, ExprType type)
317316
* @return Value parsed from content.
318317
*/
319318
private ExprValue parseStruct(Content content, String prefix, boolean supportArrays) {
320-
LinkedHashMap<String, ExprValue> result = new LinkedHashMap<>();
319+
ExprTupleValue result = ExprTupleValue.empty();
321320
content
322321
.map()
323322
.forEachRemaining(
324323
entry ->
325-
result.put(
326-
entry.getKey(),
324+
populateValueRecursive(
325+
result,
326+
new JsonPath(entry.getKey()),
327327
parse(
328328
entry.getValue(),
329329
makeField(prefix, entry.getKey()),
330330
type(makeField(prefix, entry.getKey())),
331331
supportArrays)));
332-
return new ExprTupleValue(result);
332+
return result;
333+
}
334+
335+
/**
336+
* Populate the current ExprTupleValue recursively.
337+
*
338+
* <p>If JsonPath is not a root path(i.e. has dot in its raw path), it needs update to its
339+
* children recursively until the leaf node.
340+
*
341+
* <p>If there is existing vale for the JsonPath, we need to merge the new value to the old.
342+
*/
343+
static void populateValueRecursive(ExprTupleValue result, JsonPath path, ExprValue value) {
344+
if (path.getPaths().size() == 1) {
345+
// Update the current ExprValue by using mergeTo if exists
346+
result
347+
.tupleValue()
348+
.computeIfPresent(path.getRootPath(), (key, oldValue) -> value.mergeTo(oldValue));
349+
result.tupleValue().putIfAbsent(path.getRootPath(), value);
350+
} else {
351+
result.tupleValue().putIfAbsent(path.getRootPath(), ExprTupleValue.empty());
352+
populateValueRecursive(
353+
(ExprTupleValue) result.tupleValue().get(path.getRootPath()), path.getChildPath(), value);
354+
}
355+
}
356+
357+
@Getter
358+
static class JsonPath {
359+
private final List<String> paths;
360+
361+
public JsonPath(String rawPath) {
362+
this.paths = List.of(rawPath.split("\\."));
363+
}
364+
365+
public JsonPath(List<String> paths) {
366+
this.paths = paths;
367+
}
368+
369+
public String getRootPath() {
370+
return paths.getFirst();
371+
}
372+
373+
public JsonPath getChildPath() {
374+
return new JsonPath(paths.subList(1, paths.size()));
375+
}
333376
}
334377

335378
/**

0 commit comments

Comments
 (0)