Skip to content

Commit 885230f

Browse files
authored
Fix wrong parameter and return result logic for LogPatternAggFunction (#4868)
Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent 0ab2ba2 commit 885230f

6 files changed

Lines changed: 118 additions & 16 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public LogParserAccumulator init() {
3636

3737
@Override
3838
public Object result(LogParserAccumulator acc) {
39-
if (acc.size() == 0) {
39+
if (acc.size() == 0 && acc.logSize() == 0) {
4040
return null;
4141
}
4242

@@ -89,7 +89,7 @@ public LogParserAccumulator add(
8989
this.variableCountThreshold = variableCountThreshold;
9090
this.thresholdPercentage = thresholdPercentage;
9191
acc.evaluate(field);
92-
if (bufferLimit > 0 && acc.size() == bufferLimit) {
92+
if (bufferLimit > 0 && acc.logSize() == bufferLimit) {
9393
acc.partialMerge(
9494
maxSampleCount, variableCountThreshold, thresholdPercentage, showNumberedToken);
9595
acc.clearBuffer();
@@ -152,6 +152,10 @@ public static class LogParserAccumulator implements Accumulator {
152152
public Map<String, Map<String, Object>> patternGroupMap = new HashMap<>();
153153

154154
public int size() {
155+
return patternGroupMap.size();
156+
}
157+
158+
public int logSize() {
155159
return logMessages.size();
156160
}
157161

core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public static Object evalAgg(
105105
@Parameter(name = "field") String field,
106106
@Parameter(name = "aggObject") Object aggObject,
107107
@Parameter(name = "showNumberedToken") Boolean showNumberedToken) {
108-
if (Strings.isBlank(field)) {
108+
if (Strings.isBlank(field) || aggObject == null) {
109109
return EMPTY_RESULT;
110110
}
111111
List<Map<String, Object>> aggResult = (List<Map<String, Object>>) aggObject;

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,8 @@ public void testBrainLabelMode_NotShowNumberedToken() throws IOException {
230230
"BLOCK* NameSystem.allocateBlock:"
231231
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
232232
+ " blk_-6620182933895093708",
233-
"BLOCK* NameSystem.allocateBlock:"
234-
+ " /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*>"
235-
+ " blk_<*>"));
233+
"<*> NameSystem.allocateBlock:"
234+
+ " /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*> blk_<*>"));
236235
}
237236

238237
@Test
@@ -268,21 +267,23 @@ public void testBrainLabelMode_ShowNumberedToken() throws IOException {
268267
"BLOCK* NameSystem.allocateBlock:"
269268
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
270269
+ " blk_-6620182933895093708",
271-
"BLOCK* NameSystem.allocateBlock:"
272-
+ " /user/root/sortrand/_temporary/_task_<token1>_<token2>_r_<token3>_<token4>/part<token5>"
273-
+ " blk_<token6>",
270+
"<token1> NameSystem.allocateBlock:"
271+
+ " /user/root/sortrand/_temporary/_task_<token2>_<token3>_r_<token4>_<token5>/part<token6>"
272+
+ " blk_<token7>",
274273
ImmutableMap.of(
275274
"<token1>",
276-
ImmutableList.of("200811092030"),
275+
ImmutableList.of("BLOCK*"),
277276
"<token2>",
278-
ImmutableList.of("0002"),
277+
ImmutableList.of("200811092030"),
279278
"<token3>",
280-
ImmutableList.of("000296"),
279+
ImmutableList.of("0002"),
281280
"<token4>",
282-
ImmutableList.of("0"),
281+
ImmutableList.of("000296"),
283282
"<token5>",
284-
ImmutableList.of("-00296."),
283+
ImmutableList.of("0"),
285284
"<token6>",
285+
ImmutableList.of("-00296."),
286+
"<token7>",
286287
ImmutableList.of("-6620182933895093708"))));
287288
}
288289

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled: true
7+
- do:
8+
bulk:
9+
index: hdfs_logs
10+
refresh: true
11+
body:
12+
- '{ "index": { "_id": 1 } }'
13+
- '{ "date": "20081109", "time": "203615", "pid": 148, "level": "INFO", "component": "dfs.FSNamesystem", "content": "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864" }'
14+
- '{ "index": { "_id": 2 } }'
15+
- '{ "date": "20081109", "time": "204132", "pid": 26, "level": "INFO", "component": "dfs.FSNamesystem", "content": "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864" }'
16+
- '{ "index": { "_id": 3 } }'
17+
- '{ "date": "20081109", "time": "204925", "pid": 663, "level": "WARN", "component": "dfs.DataNode$PacketResponder", "content": "PacketResponder failed for blk_6996194389878584395" }'
18+
- '{ "index": { "_id": 4 } }'
19+
- '{ "date": "20081109", "time": "205035", "pid": 31, "level": "WARN", "component": "dfs.DataNode$PacketResponder", "content": "PacketResponder failed for blk_-1547954353065580372" }'
20+
21+
22+
---
23+
teardown:
24+
- do:
25+
query.settings:
26+
body:
27+
transient:
28+
plugins.calcite.enabled : false
29+
30+
31+
---
32+
"Patterns with specified max_sample_count should return correct result":
33+
- skip:
34+
features:
35+
- headers
36+
- allowed_warnings
37+
- do:
38+
allowed_warnings:
39+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
40+
headers:
41+
Content-Type: 'application/json'
42+
ppl:
43+
body:
44+
query: 'source=hdfs_logs | patterns content method=brain mode=aggregation max_sample_count=2 variable_count_threshold=3'
45+
- match: {"total": 2}
46+
- match: {"schema": [{"name": "patterns_field", "type": "string"}, {"name": "pattern_count", "type": "bigint"}, {"name": "sample_logs", "type": "array"}]}
47+
- match: {"datarows": [
48+
[
49+
"PacketResponder failed for blk_<*>",
50+
2,
51+
[
52+
"PacketResponder failed for blk_6996194389878584395",
53+
"PacketResponder failed for blk_-1547954353065580372"
54+
]
55+
],
56+
[
57+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: <*IP*> is added to blk_<*> size <*>",
58+
2,
59+
[
60+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864",
61+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864"
62+
]
63+
]
64+
]}
65+

ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -947,8 +947,7 @@ public UnresolvedPlan visitPatternsCommand(OpenSearchPPLParser.PatternsCommandCo
947947
AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_MAX_SAMPLE_COUNT)));
948948
Literal patternBufferLimit =
949949
cmdOptions.getOrDefault(
950-
"max_sample_count",
951-
AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_BUFFER_LIMIT)));
950+
"buffer_limit", AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_BUFFER_LIMIT)));
952951
Literal showNumberedToken =
953952
cmdOptions.getOrDefault(
954953
"show_numbered_token",

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,39 @@ public void testPatternsAggregationModeWithGroupBy_ShowNumberedToken_ForSimplePa
326326
verifyPPLToSparkSQL(root, expectedSparkSql);
327327
}
328328

329+
@Test
330+
public void testPatternsAggregationMode_SpecifyAllParameters_ForBrainMethod() {
331+
String ppl =
332+
"source=EMP | patterns ENAME method=BRAIN mode=aggregation max_sample_count=2"
333+
+ " buffer_limit=1000 show_numbered_token=false variable_count_threshold=3"
334+
+ " frequency_threshold_percentage=0.1";
335+
RelNode root = getRelNode(ppl);
336+
337+
String expectedLogical =
338+
"LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))],"
339+
+ " pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))],"
340+
+ " sample_logs=[SAFE_CAST(ITEM($1, 'sample_logs'))])\n"
341+
+ " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n"
342+
+ " LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2, $3, $4, $5)])\n"
343+
+ " LogicalProject(ENAME=[$1], $f8=[2], $f9=[1000], $f10=[false],"
344+
+ " $f11=[0.1:DECIMAL(2, 1)], $f12=[3])\n"
345+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
346+
+ " Uncollect\n"
347+
+ " LogicalProject(patterns_field=[$cor0.patterns_field])\n"
348+
+ " LogicalValues(tuples=[[{ 0 }]])\n";
349+
verifyLogical(root, expectedLogical);
350+
351+
String expectedSparkSql =
352+
"SELECT TRY_CAST(`t20`.`patterns_field`['pattern'] AS STRING) `patterns_field`,"
353+
+ " TRY_CAST(`t20`.`patterns_field`['pattern_count'] AS BIGINT) `pattern_count`,"
354+
+ " TRY_CAST(`t20`.`patterns_field`['sample_logs'] AS ARRAY< STRING >) `sample_logs`\n"
355+
+ "FROM (SELECT `pattern`(`ENAME`, 2, 1000, FALSE, 0.1, 3) `patterns_field`\n"
356+
+ "FROM `scott`.`EMP`) `$cor0`,\n"
357+
+ "LATERAL UNNEST((SELECT `$cor0`.`patterns_field`\n"
358+
+ "FROM (VALUES (0)) `t` (`ZERO`))) `t20` (`patterns_field`)";
359+
verifyPPLToSparkSQL(root, expectedSparkSql);
360+
}
361+
329362
@Test
330363
public void testPatternsAggregationMode_NotShowNumberedToken_ForBrainMethod() {
331364
String ppl = "source=EMP | patterns ENAME method=BRAIN mode=aggregation";

0 commit comments

Comments
 (0)