Skip to content

Commit d0715d6

Browse files
committed
Add prototype for function prometheusQuery().
1 parent 1bdd21f commit d0715d6

7 files changed

Lines changed: 230 additions & 0 deletions

src/Common/ErrorCodes.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@
616616
M(736, ICEBERG_CATALOG_ERROR) \
617617
M(737, GOOGLE_CLOUD_ERROR) \
618618
M(738, PART_IS_LOCKED) \
619+
M(739, CANNOT_PARSE_PROMQL_QUERY) \
619620
\
620621
M(900, DISTRIBUTED_CACHE_ERROR) \
621622
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

src/Common/config.h.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#cmakedefine01 USE_LIBARCHIVE
6969
#cmakedefine01 USE_POCKETFFT
7070
#cmakedefine01 USE_PROMETHEUS_PROTOBUFS
71+
#cmakedefine01 USE_ANTLR4_GRAMMARS
7172
#cmakedefine01 USE_MONGODB
7273
#cmakedefine01 USE_NUMACTL
7374
#cmakedefine01 USE_FUZZING_MODE
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#include <TableFunctions/TableFunctionPrometheusQuery.h>
2+
3+
#include <DataTypes/DataTypeArray.h>
4+
#include <DataTypes/DataTypeDateTime64.h>
5+
#include <DataTypes/DataTypeString.h>
6+
#include <DataTypes/DataTypeTuple.h>
7+
#include <DataTypes/DataTypesNumber.h>
8+
#include <Interpreters/Context.h>
9+
#include <Interpreters/DatabaseCatalog.h>
10+
#include <Interpreters/evaluateConstantExpression.h>
11+
#include <Parsers/ASTFunction.h>
12+
#include <Parsers/ASTIdentifier.h>
13+
#include <Storages/StorageNull.h>
14+
#include <Storages/StorageTimeSeries.h>
15+
#include <Storages/checkAndGetLiteralArgument.h>
16+
#include <TableFunctions/TableFunctionFactory.h>
17+
18+
#include "config.h"
19+
20+
#if USE_ANTLR4_GRAMMARS
21+
#pragma clang diagnostic push
22+
#pragma clang diagnostic ignored "-Weverything"
23+
#include <PromQLLexer.h>
24+
#include <PromQLParser.h>
25+
#pragma clang diagnostic pop
26+
#endif
27+
28+
29+
namespace DB
30+
{
31+
32+
namespace ErrorCodes
33+
{
34+
extern const int BAD_ARGUMENTS;
35+
extern const int LOGICAL_ERROR;
36+
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
37+
extern const int CANNOT_PARSE_PROMQL_QUERY;
38+
extern const int SUPPORT_IS_DISABLED;
39+
}
40+
41+
42+
namespace
43+
{
44+
#if USE_ANTLR4_GRAMMARS
45+
class PromQLErrorListener : public antlr4::BaseErrorListener
46+
{
47+
public:
48+
void syntaxError(antlr4::Recognizer * /*recognizer*/, antlr4::Token * /*offendingSymbol*/,
49+
size_t line, size_t charPositionInLine, const std::string &msg, std::exception_ptr /*e*/) override
50+
{
51+
throw Exception(ErrorCodes::CANNOT_PARSE_PROMQL_QUERY,
52+
"Syntax error: {} while parsing PromQL query (line {}, column {})",
53+
msg, line, charPositionInLine + 1);
54+
}
55+
};
56+
57+
void checkPromQLSyntax(const String & promql_query)
58+
{
59+
antlr4::ANTLRInputStream input{promql_query};
60+
PromQLErrorListener error_listener;
61+
62+
PromQLLexer lexer{&input};
63+
lexer.removeErrorListeners();
64+
lexer.addErrorListener(&error_listener);
65+
antlr4::CommonTokenStream tokens(&lexer);
66+
67+
PromQLParser parser{&tokens};
68+
parser.removeErrorListeners();
69+
parser.addErrorListener(&error_listener);
70+
71+
auto * expression = parser.expression();
72+
chassert(expression);
73+
74+
String info = expression->toStringTree(&parser, true);
75+
LOG_INFO(getLogger("TableFunctionPrometheusQuery"), "Parsed PromQL query: {}", info);
76+
}
77+
#endif
78+
}
79+
80+
81+
void TableFunctionPrometheusQuery::parseArguments(const ASTPtr & ast_function, ContextPtr context)
82+
{
83+
const auto & args_func = ast_function->as<ASTFunction &>();
84+
85+
if (!args_func.arguments)
86+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", name);
87+
88+
auto & args = args_func.arguments->children;
89+
90+
if ((args.size() != 2) && (args.size() != 3))
91+
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
92+
"Table function '{}' requires one or two arguments: {}([database, ] time_series_table)", name, name);
93+
94+
if (args.size() == 2)
95+
{
96+
/// prometheusQuery( [my_db.]my_time_series_table )
97+
if (const auto * id = args[0]->as<ASTIdentifier>())
98+
{
99+
if (auto table_id = id->createTable())
100+
time_series_storage_id = table_id->getTableId();
101+
}
102+
}
103+
104+
if (time_series_storage_id.empty())
105+
{
106+
for (size_t i = 0; i != args.size() - 1; ++i)
107+
{
108+
auto & arg = args[i];
109+
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
110+
}
111+
112+
if (args.size() == 2)
113+
{
114+
/// prometheusQuery( 'my_time_series_table', 'promql_query' )
115+
time_series_storage_id.table_name = checkAndGetLiteralArgument<String>(args[0], "table_name");
116+
}
117+
else
118+
{
119+
/// timeSeriesMetrics( 'mydb', 'my_time_series_table', 'promql_query' )
120+
time_series_storage_id.database_name = checkAndGetLiteralArgument<String>(args[0], "database_name");
121+
time_series_storage_id.table_name = checkAndGetLiteralArgument<String>(args[1], "table_name");
122+
}
123+
}
124+
125+
if (time_series_storage_id.empty())
126+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Couldn't get a table name from the arguments of the {} table function", name);
127+
128+
time_series_storage_id = context->resolveStorageID(time_series_storage_id);
129+
130+
auto & last_arg = args[args.size() - 1];
131+
last_arg = evaluateConstantExpressionOrIdentifierAsLiteral(last_arg, context);
132+
promql_query = checkAndGetLiteralArgument<String>(last_arg, "promql_query");
133+
}
134+
135+
136+
ColumnsDescription TableFunctionPrometheusQuery::getActualTableStructure(ContextPtr /* context */, bool /* is_insert_query */) const
137+
{
138+
return ColumnsDescription(NamesAndTypesList({
139+
{"metric_name", std::make_shared<DataTypeString>()},
140+
{"tags", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
141+
{"time_series", std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(
142+
DataTypes{std::make_shared<DataTypeDateTime64>(3), std::make_shared<DataTypeFloat64>()},
143+
Strings{"timestamp", "value"}))},
144+
}));
145+
}
146+
147+
148+
const char * TableFunctionPrometheusQuery::getStorageTypeName() const
149+
{
150+
return "Null";
151+
}
152+
153+
154+
StoragePtr TableFunctionPrometheusQuery::executeImpl(
155+
const ASTPtr & /* ast_function */,
156+
[[maybe_unused]] ContextPtr context,
157+
[[maybe_unused]] const String & table_name,
158+
ColumnsDescription /* cached_columns */,
159+
bool /* is_insert_query */) const
160+
{
161+
#if USE_ANTLR4_GRAMMARS
162+
163+
/// TODO
164+
checkPromQLSyntax(promql_query);
165+
166+
ColumnsDescription columns = getActualTableStructure(context, /* is_insert_query = */ false);
167+
auto res = std::make_shared<StorageNull>(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription(), String{});
168+
res->startup();
169+
return res;
170+
171+
#else
172+
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ANTLR4 support is disabled");
173+
#endif
174+
}
175+
176+
177+
void registerTableFunctionPrometheusQuery(TableFunctionFactory & factory)
178+
{
179+
factory.registerFunction<TableFunctionPrometheusQuery>(
180+
{.documentation = {
181+
.description=R"(Executes a prometheus query on a TimeSeries table.)",
182+
.examples{{"prometheusQuery", "SELECT * from prometheusQuery('mydb', 'time_series_table', 'http_requests_total{job=\"prometheus\",group=\"canary\"}');", ""}},
183+
.category{""}}
184+
});
185+
}
186+
187+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <TableFunctions/ITableFunction.h>
4+
#include <Storages/StorageTimeSeries.h>
5+
6+
7+
namespace DB
8+
{
9+
10+
/// Table function prometheusQuery('mydb', 'ts_table', 'query') executes a prometheus query on a TimeSeries table.
11+
/// This table function can execute either instant or range queries.
12+
class TableFunctionPrometheusQuery : public ITableFunction
13+
{
14+
public:
15+
static constexpr auto name = "prometheusQuery";
16+
String getName() const override { return name; }
17+
18+
private:
19+
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
20+
21+
StoragePtr executeImpl(
22+
const ASTPtr & ast_function,
23+
ContextPtr context,
24+
const std::string & table_name,
25+
ColumnsDescription cached_columns,
26+
bool is_insert_query) const override;
27+
28+
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
29+
const char * getStorageTypeName() const override;
30+
31+
StorageID time_series_storage_id = StorageID::createEmpty();
32+
String promql_query;
33+
};
34+
35+
}

src/TableFunctions/registerTableFunctions.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
6161

6262
registerTableFunctionFormat(factory);
6363
registerTableFunctionExplain(factory);
64+
6465
registerTableFunctionTimeSeries(factory);
66+
registerTableFunctionPrometheusQuery(factory);
6567

6668
registerTableFunctionObjectStorage(factory);
6769
registerTableFunctionObjectStorageCluster(factory);

src/TableFunctions/registerTableFunctions.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ void registerDataLakeTableFunctions(TableFunctionFactory & factory);
7373
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
7474

7575
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
76+
void registerTableFunctionPrometheusQuery(TableFunctionFactory & factory);
7677

7778
void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]);
7879

src/configure_config.cmake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ endif()
182182
if (TARGET ch_contrib::prometheus_protobufs)
183183
set(USE_PROMETHEUS_PROTOBUFS 1)
184184
endif()
185+
if (TARGET ch_contrib::antlr4_grammars)
186+
set(USE_ANTLR4_GRAMMARS 1)
187+
endif()
185188
if (TARGET ch_contrib::mongocxx)
186189
set(USE_MONGODB 1)
187190
endif()

0 commit comments

Comments
 (0)