Skip to content

Commit 12eb45a

Browse files
authored
Support ODBC compatibility with ODFE SQL (#238)
* compatible with opensearch and all versions in odfe with sql plugin Signed-off-by: chloe-zh <chloezh1102@gmail.com> * address comments Signed-off-by: chloe-zh <chloezh1102@gmail.com>
1 parent 0bfae4e commit 12eb45a

2 files changed

Lines changed: 79 additions & 13 deletions

File tree

sql-odbc/src/sqlodbc/opensearch_communication.cpp

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,6 @@
4444
// clang-format on
4545

4646
static const std::string ctype = "application/json";
47-
static const std::string SQL_ENDPOINT_FORMAT_JDBC =
48-
"/_plugins/_sql?format=jdbc";
49-
static const std::string SQL_ENDPOINT_FORMAT_RAW =
50-
"/_plugins/_sql?format=raw";
51-
static const std::string SQL_ENDPOINT_CLOSE_CURSOR = "/_plugins/_sql/close";
52-
static const std::string PLUGIN_ENDPOINT_FORMAT_JSON =
53-
"/_cat/plugins?format=json";
5447
static const std::string ALLOCATION_TAG = "AWS_SIGV4_AUTH";
5548
static const std::string SERVICE_NAME = "es";
5649
static const std::string ESODBC_PROFILE_NAME = "opensearchodbc";
@@ -481,10 +474,10 @@ bool OpenSearchCommunication::IsSQLPluginEnabled(std::shared_ptr< ErrorDetails >
481474

482475
bool OpenSearchCommunication::CheckSQLPluginAvailability() {
483476
LogMsg(OPENSEARCH_ALL, "Checking for SQL plugin status.");
484-
std::string test_query = "SELECT 1";
477+
std::string test_query = "SHOW TABLES LIKE %";
485478
try {
486479
std::shared_ptr< Aws::Http::HttpResponse > response =
487-
IssueRequest(SQL_ENDPOINT_FORMAT_RAW,
480+
IssueRequest(sql_endpoint,
488481
Aws::Http::HttpMethod::HTTP_POST, ctype, test_query);
489482
if (response == nullptr) {
490483
m_error_message =
@@ -552,6 +545,11 @@ bool OpenSearchCommunication::EstablishConnection() {
552545
InitializeConnection();
553546
}
554547

548+
// check if the endpoint is initialized
549+
if (sql_endpoint.empty()) {
550+
SetSqlEndpoint();
551+
}
552+
555553
// Check whether SQL plugin has been installed and enabled in the
556554
// OpenSearch server since the SQL plugin is a prerequisite to
557555
// use this driver.
@@ -580,7 +578,7 @@ std::vector< std::string > OpenSearchCommunication::GetColumnsWithSelectQuery(
580578

581579
// Issue request
582580
std::shared_ptr< Aws::Http::HttpResponse > response =
583-
IssueRequest(SQL_ENDPOINT_FORMAT_JDBC, Aws::Http::HttpMethod::HTTP_POST,
581+
IssueRequest(sql_endpoint, Aws::Http::HttpMethod::HTTP_POST,
584582
ctype, query);
585583

586584
// Validate response
@@ -654,7 +652,7 @@ int OpenSearchCommunication::ExecDirect(const char* query, const char* fetch_siz
654652

655653
// Issue request
656654
std::shared_ptr< Aws::Http::HttpResponse > response =
657-
IssueRequest(SQL_ENDPOINT_FORMAT_JDBC, Aws::Http::HttpMethod::HTTP_POST,
655+
IssueRequest(sql_endpoint, Aws::Http::HttpMethod::HTTP_POST,
658656
ctype, statement, fetch_size);
659657

660658
// Validate response
@@ -733,7 +731,7 @@ void OpenSearchCommunication::SendCursorQueries(std::string cursor) {
733731
try {
734732
while (!cursor.empty() && m_is_retrieving) {
735733
std::shared_ptr< Aws::Http::HttpResponse > response = IssueRequest(
736-
SQL_ENDPOINT_FORMAT_JDBC, Aws::Http::HttpMethod::HTTP_POST,
734+
sql_endpoint, Aws::Http::HttpMethod::HTTP_POST,
737735
ctype, "", "", cursor);
738736
if (response == nullptr) {
739737
m_error_message =
@@ -782,7 +780,7 @@ void OpenSearchCommunication::SendCursorQueries(std::string cursor) {
782780

783781
void OpenSearchCommunication::SendCloseCursorRequest(const std::string& cursor) {
784782
std::shared_ptr< Aws::Http::HttpResponse > response =
785-
IssueRequest(SQL_ENDPOINT_CLOSE_CURSOR,
783+
IssueRequest(sql_endpoint + "/close",
786784
Aws::Http::HttpMethod::HTTP_POST, ctype, "", "", cursor);
787785
if (response == nullptr) {
788786
m_error_message =
@@ -919,6 +917,60 @@ std::string OpenSearchCommunication::GetServerVersion() {
919917
return "";
920918
}
921919

920+
std::string OpenSearchCommunication::GetServerDistribution() {
921+
if (!m_http_client) {
922+
InitializeConnection();
923+
}
924+
925+
std::shared_ptr< Aws::Http::HttpResponse > response =
926+
IssueRequest("", Aws::Http::HttpMethod::HTTP_GET, "", "", "");
927+
if (response == nullptr) {
928+
m_error_message =
929+
"Failed to receive response from server version query. "
930+
"Received NULL response.";
931+
SetErrorDetails("Connection error", m_error_message,
932+
ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE);
933+
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
934+
return "";
935+
}
936+
937+
// Parse server version distribution
938+
if (response->GetResponseCode() == Aws::Http::HttpResponseCode::OK) {
939+
try {
940+
AwsHttpResponseToString(response, m_response_str);
941+
rabbit::document doc;
942+
doc.parse(m_response_str);
943+
if (doc.has("version") && doc["version"].has("distribution")) {
944+
return doc["version"]["distribution"].as_string();
945+
}
946+
} catch (const rabbit::type_mismatch& e) {
947+
m_error_message = "Error parsing main endpoint response: "
948+
+ std::string(e.what());
949+
SetErrorDetails("Connection error", m_error_message,
950+
ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE);
951+
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
952+
} catch (const rabbit::parse_error& e) {
953+
m_error_message = "Error parsing main endpoint response: "
954+
+ std::string(e.what());
955+
SetErrorDetails("Connection error", m_error_message,
956+
ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE);
957+
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
958+
} catch (const std::exception& e) {
959+
m_error_message = "Error parsing main endpoint response: "
960+
+ std::string(e.what());
961+
SetErrorDetails("Connection error", m_error_message,
962+
ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE);
963+
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
964+
} catch (...) {
965+
LogMsg(OPENSEARCH_ERROR,
966+
"Unknown exception thrown when parsing main endpoint "
967+
"response.");
968+
}
969+
}
970+
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
971+
return "";
972+
}
973+
922974
std::string OpenSearchCommunication::GetClusterName() {
923975
if (!m_http_client) {
924976
InitializeConnection();
@@ -974,3 +1026,12 @@ std::string OpenSearchCommunication::GetClusterName() {
9741026
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
9751027
return "";
9761028
}
1029+
1030+
void OpenSearchCommunication::SetSqlEndpoint() {
1031+
std::string distribution = GetServerDistribution();
1032+
if (distribution.compare("opensearch") == 0) {
1033+
sql_endpoint = "/_plugins/_sql";
1034+
} else {
1035+
sql_endpoint = "/_opendistro/_sql";
1036+
}
1037+
}

sql-odbc/src/sqlodbc/opensearch_communication.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class OpenSearchCommunication {
7878
static bool IsSQLPluginEnabled(std::shared_ptr< ErrorDetails > error_details);
7979
bool CheckSQLPluginAvailability();
8080
std::string GetServerVersion();
81+
std::string GetServerDistribution();
8182
std::string GetClusterName();
8283
std::shared_ptr< Aws::Http::HttpResponse > IssueRequest(
8384
const std::string& endpoint, const Aws::Http::HttpMethod request_type,
@@ -90,6 +91,10 @@ class OpenSearchCommunication {
9091
void StopResultRetrieval();
9192
std::vector< std::string > GetColumnsWithSelectQuery(
9293
const std::string table_name);
94+
void SetSqlEndpoint();
95+
96+
// the endpoint is set according to distribution (ES/OpenSearch)
97+
std::string sql_endpoint;
9398

9499
private:
95100
void InitializeConnection();

0 commit comments

Comments
 (0)