|
44 | 44 | // clang-format on |
45 | 45 |
|
46 | 46 | static const std::string ctype = "application/json"; |
47 | | -static const std::string SQL_ENDPOINT_FORMAT_JDBC = |
48 | | - "/_plugins/_sql?format=jdbc"; |
49 | | -static const std::string SQL_ENDPOINT_FORMAT_RAW = |
50 | | - "/_plugins/_sql?format=raw"; |
51 | | -static const std::string SQL_ENDPOINT_CLOSE_CURSOR = "/_plugins/_sql/close"; |
52 | | -static const std::string PLUGIN_ENDPOINT_FORMAT_JSON = |
53 | | - "/_cat/plugins?format=json"; |
54 | 47 | static const std::string ALLOCATION_TAG = "AWS_SIGV4_AUTH"; |
55 | 48 | static const std::string SERVICE_NAME = "es"; |
56 | 49 | static const std::string ESODBC_PROFILE_NAME = "opensearchodbc"; |
@@ -481,10 +474,10 @@ bool OpenSearchCommunication::IsSQLPluginEnabled(std::shared_ptr< ErrorDetails > |
481 | 474 |
|
482 | 475 | bool OpenSearchCommunication::CheckSQLPluginAvailability() { |
483 | 476 | LogMsg(OPENSEARCH_ALL, "Checking for SQL plugin status."); |
484 | | - std::string test_query = "SELECT 1"; |
| 477 | + std::string test_query = "SHOW TABLES LIKE %"; |
485 | 478 | try { |
486 | 479 | std::shared_ptr< Aws::Http::HttpResponse > response = |
487 | | - IssueRequest(SQL_ENDPOINT_FORMAT_RAW, |
| 480 | + IssueRequest(sql_endpoint, |
488 | 481 | Aws::Http::HttpMethod::HTTP_POST, ctype, test_query); |
489 | 482 | if (response == nullptr) { |
490 | 483 | m_error_message = |
@@ -552,6 +545,11 @@ bool OpenSearchCommunication::EstablishConnection() { |
552 | 545 | InitializeConnection(); |
553 | 546 | } |
554 | 547 |
|
| 548 | + // check if the endpoint is initialized |
| 549 | + if (sql_endpoint.empty()) { |
| 550 | + SetSqlEndpoint(); |
| 551 | + } |
| 552 | + |
555 | 553 | // Check whether SQL plugin has been installed and enabled in the |
556 | 554 | // OpenSearch server since the SQL plugin is a prerequisite to |
557 | 555 | // use this driver. |
@@ -580,7 +578,7 @@ std::vector< std::string > OpenSearchCommunication::GetColumnsWithSelectQuery( |
580 | 578 |
|
581 | 579 | // Issue request |
582 | 580 | std::shared_ptr< Aws::Http::HttpResponse > response = |
583 | | - IssueRequest(SQL_ENDPOINT_FORMAT_JDBC, Aws::Http::HttpMethod::HTTP_POST, |
| 581 | + IssueRequest(sql_endpoint, Aws::Http::HttpMethod::HTTP_POST, |
584 | 582 | ctype, query); |
585 | 583 |
|
586 | 584 | // Validate response |
@@ -654,7 +652,7 @@ int OpenSearchCommunication::ExecDirect(const char* query, const char* fetch_siz |
654 | 652 |
|
655 | 653 | // Issue request |
656 | 654 | std::shared_ptr< Aws::Http::HttpResponse > response = |
657 | | - IssueRequest(SQL_ENDPOINT_FORMAT_JDBC, Aws::Http::HttpMethod::HTTP_POST, |
| 655 | + IssueRequest(sql_endpoint, Aws::Http::HttpMethod::HTTP_POST, |
658 | 656 | ctype, statement, fetch_size); |
659 | 657 |
|
660 | 658 | // Validate response |
@@ -733,7 +731,7 @@ void OpenSearchCommunication::SendCursorQueries(std::string cursor) { |
733 | 731 | try { |
734 | 732 | while (!cursor.empty() && m_is_retrieving) { |
735 | 733 | std::shared_ptr< Aws::Http::HttpResponse > response = IssueRequest( |
736 | | - SQL_ENDPOINT_FORMAT_JDBC, Aws::Http::HttpMethod::HTTP_POST, |
| 734 | + sql_endpoint, Aws::Http::HttpMethod::HTTP_POST, |
737 | 735 | ctype, "", "", cursor); |
738 | 736 | if (response == nullptr) { |
739 | 737 | m_error_message = |
@@ -782,7 +780,7 @@ void OpenSearchCommunication::SendCursorQueries(std::string cursor) { |
782 | 780 |
|
783 | 781 | void OpenSearchCommunication::SendCloseCursorRequest(const std::string& cursor) { |
784 | 782 | std::shared_ptr< Aws::Http::HttpResponse > response = |
785 | | - IssueRequest(SQL_ENDPOINT_CLOSE_CURSOR, |
| 783 | + IssueRequest(sql_endpoint + "/close", |
786 | 784 | Aws::Http::HttpMethod::HTTP_POST, ctype, "", "", cursor); |
787 | 785 | if (response == nullptr) { |
788 | 786 | m_error_message = |
@@ -919,6 +917,60 @@ std::string OpenSearchCommunication::GetServerVersion() { |
919 | 917 | return ""; |
920 | 918 | } |
921 | 919 |
|
| 920 | +std::string OpenSearchCommunication::GetServerDistribution() { |
| 921 | + if (!m_http_client) { |
| 922 | + InitializeConnection(); |
| 923 | + } |
| 924 | + |
| 925 | + std::shared_ptr< Aws::Http::HttpResponse > response = |
| 926 | + IssueRequest("", Aws::Http::HttpMethod::HTTP_GET, "", "", ""); |
| 927 | + if (response == nullptr) { |
| 928 | + m_error_message = |
| 929 | + "Failed to receive response from server version query. " |
| 930 | + "Received NULL response."; |
| 931 | + SetErrorDetails("Connection error", m_error_message, |
| 932 | + ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE); |
| 933 | + LogMsg(OPENSEARCH_ERROR, m_error_message.c_str()); |
| 934 | + return ""; |
| 935 | + } |
| 936 | + |
| 937 | + // Parse server version distribution |
| 938 | + if (response->GetResponseCode() == Aws::Http::HttpResponseCode::OK) { |
| 939 | + try { |
| 940 | + AwsHttpResponseToString(response, m_response_str); |
| 941 | + rabbit::document doc; |
| 942 | + doc.parse(m_response_str); |
| 943 | + if (doc.has("version") && doc["version"].has("distribution")) { |
| 944 | + return doc["version"]["distribution"].as_string(); |
| 945 | + } |
| 946 | + } catch (const rabbit::type_mismatch& e) { |
| 947 | + m_error_message = "Error parsing main endpoint response: " |
| 948 | + + std::string(e.what()); |
| 949 | + SetErrorDetails("Connection error", m_error_message, |
| 950 | + ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE); |
| 951 | + LogMsg(OPENSEARCH_ERROR, m_error_message.c_str()); |
| 952 | + } catch (const rabbit::parse_error& e) { |
| 953 | + m_error_message = "Error parsing main endpoint response: " |
| 954 | + + std::string(e.what()); |
| 955 | + SetErrorDetails("Connection error", m_error_message, |
| 956 | + ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE); |
| 957 | + LogMsg(OPENSEARCH_ERROR, m_error_message.c_str()); |
| 958 | + } catch (const std::exception& e) { |
| 959 | + m_error_message = "Error parsing main endpoint response: " |
| 960 | + + std::string(e.what()); |
| 961 | + SetErrorDetails("Connection error", m_error_message, |
| 962 | + ConnErrorType::CONN_ERROR_COMM_LINK_FAILURE); |
| 963 | + LogMsg(OPENSEARCH_ERROR, m_error_message.c_str()); |
| 964 | + } catch (...) { |
| 965 | + LogMsg(OPENSEARCH_ERROR, |
| 966 | + "Unknown exception thrown when parsing main endpoint " |
| 967 | + "response."); |
| 968 | + } |
| 969 | + } |
| 970 | + LogMsg(OPENSEARCH_ERROR, m_error_message.c_str()); |
| 971 | + return ""; |
| 972 | +} |
| 973 | + |
922 | 974 | std::string OpenSearchCommunication::GetClusterName() { |
923 | 975 | if (!m_http_client) { |
924 | 976 | InitializeConnection(); |
@@ -974,3 +1026,12 @@ std::string OpenSearchCommunication::GetClusterName() { |
974 | 1026 | LogMsg(OPENSEARCH_ERROR, m_error_message.c_str()); |
975 | 1027 | return ""; |
976 | 1028 | } |
| 1029 | + |
| 1030 | +void OpenSearchCommunication::SetSqlEndpoint() { |
| 1031 | + std::string distribution = GetServerDistribution(); |
| 1032 | + if (distribution.compare("opensearch") == 0) { |
| 1033 | + sql_endpoint = "/_plugins/_sql"; |
| 1034 | + } else { |
| 1035 | + sql_endpoint = "/_opendistro/_sql"; |
| 1036 | + } |
| 1037 | +} |
0 commit comments