|
| 1 | +#include <TableFunctions/TableFunctionPrometheusQuery.h> |
| 2 | + |
| 3 | +#include <DataTypes/DataTypeArray.h> |
| 4 | +#include <DataTypes/DataTypeDateTime64.h> |
| 5 | +#include <DataTypes/DataTypeString.h> |
| 6 | +#include <DataTypes/DataTypeTuple.h> |
| 7 | +#include <DataTypes/DataTypesNumber.h> |
| 8 | +#include <Interpreters/Context.h> |
| 9 | +#include <Interpreters/DatabaseCatalog.h> |
| 10 | +#include <Interpreters/evaluateConstantExpression.h> |
| 11 | +#include <Parsers/ASTFunction.h> |
| 12 | +#include <Parsers/ASTIdentifier.h> |
| 13 | +#include <Storages/StorageNull.h> |
| 14 | +#include <Storages/StorageTimeSeries.h> |
| 15 | +#include <Storages/checkAndGetLiteralArgument.h> |
| 16 | +#include <TableFunctions/TableFunctionFactory.h> |
| 17 | + |
| 18 | +#include "config.h" |
| 19 | + |
| 20 | +#if USE_ANTLR4_GRAMMARS |
| 21 | +#pragma clang diagnostic push |
| 22 | +#pragma clang diagnostic ignored "-Weverything" |
| 23 | +#include <PromQLLexer.h> |
| 24 | +#include <PromQLParser.h> |
| 25 | +#pragma clang diagnostic pop |
| 26 | +#endif |
| 27 | + |
| 28 | + |
| 29 | +namespace DB |
| 30 | +{ |
| 31 | + |
| 32 | +namespace ErrorCodes |
| 33 | +{ |
| 34 | + extern const int BAD_ARGUMENTS; |
| 35 | + extern const int LOGICAL_ERROR; |
| 36 | + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 37 | + extern const int CANNOT_PARSE_PROMQL_QUERY; |
| 38 | + extern const int SUPPORT_IS_DISABLED; |
| 39 | +} |
| 40 | + |
| 41 | + |
| 42 | +namespace |
| 43 | +{ |
| 44 | +#if USE_ANTLR4_GRAMMARS |
| 45 | + class PromQLErrorListener : public antlr4::BaseErrorListener |
| 46 | + { |
| 47 | + public: |
| 48 | + void syntaxError(antlr4::Recognizer * /*recognizer*/, antlr4::Token * /*offendingSymbol*/, |
| 49 | + size_t line, size_t charPositionInLine, const std::string &msg, std::exception_ptr /*e*/) override |
| 50 | + { |
| 51 | + throw Exception(ErrorCodes::CANNOT_PARSE_PROMQL_QUERY, |
| 52 | + "Syntax error: {} while parsing PromQL query (line {}, column {})", |
| 53 | + msg, line, charPositionInLine + 1); |
| 54 | + } |
| 55 | + }; |
| 56 | + |
| 57 | + void checkPromQLSyntax(const String & promql_query) |
| 58 | + { |
| 59 | + antlr4::ANTLRInputStream input{promql_query}; |
| 60 | + PromQLErrorListener error_listener; |
| 61 | + |
| 62 | + PromQLLexer lexer{&input}; |
| 63 | + lexer.removeErrorListeners(); |
| 64 | + lexer.addErrorListener(&error_listener); |
| 65 | + antlr4::CommonTokenStream tokens(&lexer); |
| 66 | + |
| 67 | + PromQLParser parser{&tokens}; |
| 68 | + parser.removeErrorListeners(); |
| 69 | + parser.addErrorListener(&error_listener); |
| 70 | + |
| 71 | + auto * expression = parser.expression(); |
| 72 | + chassert(expression); |
| 73 | + |
| 74 | + String info = expression->toStringTree(&parser, true); |
| 75 | + LOG_INFO(getLogger("TableFunctionPrometheusQuery"), "Parsed PromQL query: {}", info); |
| 76 | + } |
| 77 | +#endif |
| 78 | +} |
| 79 | + |
| 80 | + |
| 81 | +void TableFunctionPrometheusQuery::parseArguments(const ASTPtr & ast_function, ContextPtr context) |
| 82 | +{ |
| 83 | + const auto & args_func = ast_function->as<ASTFunction &>(); |
| 84 | + |
| 85 | + if (!args_func.arguments) |
| 86 | + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", name); |
| 87 | + |
| 88 | + auto & args = args_func.arguments->children; |
| 89 | + |
| 90 | + if ((args.size() != 2) && (args.size() != 3)) |
| 91 | + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, |
| 92 | + "Table function '{}' requires one or two arguments: {}([database, ] time_series_table)", name, name); |
| 93 | + |
| 94 | + if (args.size() == 2) |
| 95 | + { |
| 96 | + /// prometheusQuery( [my_db.]my_time_series_table ) |
| 97 | + if (const auto * id = args[0]->as<ASTIdentifier>()) |
| 98 | + { |
| 99 | + if (auto table_id = id->createTable()) |
| 100 | + time_series_storage_id = table_id->getTableId(); |
| 101 | + } |
| 102 | + } |
| 103 | + |
| 104 | + if (time_series_storage_id.empty()) |
| 105 | + { |
| 106 | + for (size_t i = 0; i != args.size() - 1; ++i) |
| 107 | + { |
| 108 | + auto & arg = args[i]; |
| 109 | + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); |
| 110 | + } |
| 111 | + |
| 112 | + if (args.size() == 2) |
| 113 | + { |
| 114 | + /// prometheusQuery( 'my_time_series_table', 'promql_query' ) |
| 115 | + time_series_storage_id.table_name = checkAndGetLiteralArgument<String>(args[0], "table_name"); |
| 116 | + } |
| 117 | + else |
| 118 | + { |
| 119 | + /// timeSeriesMetrics( 'mydb', 'my_time_series_table', 'promql_query' ) |
| 120 | + time_series_storage_id.database_name = checkAndGetLiteralArgument<String>(args[0], "database_name"); |
| 121 | + time_series_storage_id.table_name = checkAndGetLiteralArgument<String>(args[1], "table_name"); |
| 122 | + } |
| 123 | + } |
| 124 | + |
| 125 | + if (time_series_storage_id.empty()) |
| 126 | + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Couldn't get a table name from the arguments of the {} table function", name); |
| 127 | + |
| 128 | + time_series_storage_id = context->resolveStorageID(time_series_storage_id); |
| 129 | + |
| 130 | + auto & last_arg = args[args.size() - 1]; |
| 131 | + last_arg = evaluateConstantExpressionOrIdentifierAsLiteral(last_arg, context); |
| 132 | + promql_query = checkAndGetLiteralArgument<String>(last_arg, "promql_query"); |
| 133 | +} |
| 134 | + |
| 135 | + |
| 136 | +ColumnsDescription TableFunctionPrometheusQuery::getActualTableStructure(ContextPtr /* context */, bool /* is_insert_query */) const |
| 137 | +{ |
| 138 | + return ColumnsDescription(NamesAndTypesList({ |
| 139 | + {"metric_name", std::make_shared<DataTypeString>()}, |
| 140 | + {"tags", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
| 141 | + {"time_series", std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>( |
| 142 | + DataTypes{std::make_shared<DataTypeDateTime64>(3), std::make_shared<DataTypeFloat64>()}, |
| 143 | + Strings{"timestamp", "value"}))}, |
| 144 | + })); |
| 145 | +} |
| 146 | + |
| 147 | + |
| 148 | +const char * TableFunctionPrometheusQuery::getStorageTypeName() const |
| 149 | +{ |
| 150 | + return "Null"; |
| 151 | +} |
| 152 | + |
| 153 | + |
| 154 | +StoragePtr TableFunctionPrometheusQuery::executeImpl( |
| 155 | + const ASTPtr & /* ast_function */, |
| 156 | + [[maybe_unused]] ContextPtr context, |
| 157 | + [[maybe_unused]] const String & table_name, |
| 158 | + ColumnsDescription /* cached_columns */, |
| 159 | + bool /* is_insert_query */) const |
| 160 | +{ |
| 161 | +#if USE_ANTLR4_GRAMMARS |
| 162 | + |
| 163 | + /// TODO |
| 164 | + checkPromQLSyntax(promql_query); |
| 165 | + |
| 166 | + ColumnsDescription columns = getActualTableStructure(context, /* is_insert_query = */ false); |
| 167 | + auto res = std::make_shared<StorageNull>(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription(), String{}); |
| 168 | + res->startup(); |
| 169 | + return res; |
| 170 | + |
| 171 | +#else |
| 172 | + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ANTLR4 support is disabled"); |
| 173 | +#endif |
| 174 | +} |
| 175 | + |
| 176 | + |
| 177 | +void registerTableFunctionPrometheusQuery(TableFunctionFactory & factory) |
| 178 | +{ |
| 179 | + factory.registerFunction<TableFunctionPrometheusQuery>( |
| 180 | + {.documentation = { |
| 181 | + .description=R"(Executes a prometheus query on a TimeSeries table.)", |
| 182 | + .examples{{"prometheusQuery", "SELECT * from prometheusQuery('mydb', 'time_series_table', 'http_requests_total{job=\"prometheus\",group=\"canary\"}');", ""}}, |
| 183 | + .category{""}} |
| 184 | + }); |
| 185 | +} |
| 186 | + |
| 187 | +} |
0 commit comments