Skip to content

Commit 0d1ea5d

Browse files
joellubipitroualamb
authored
GH-38255: [Go][C++] Implement Flight SQL Bulk Ingestion (#38385)
### Rationale for this change It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate. ### What changes are included in this PR? - Format: - Add CommandStatementIngest message type to Flight SQL proto definition - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo - Go: - Generate pb - Server-side implementation - Client-side implementation - Unit + integration tests - C++: - Server-side implementation - Client-side implementation - Integration tests ### Are these changes tested? Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`. ### Are there any user-facing changes? Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users. * Closes: #38255 Lead-authored-by: Joel Lubinitsky <joellubi@gmail.com> Co-authored-by: Joel Lubinitsky <33523178+joellubi@users.noreply.github.com> Co-authored-by: Antoine Pitrou <pitrou@free.fr> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Signed-off-by: Matt Topol <zotthewizard@gmail.com>
1 parent 7003e90 commit 0d1ea5d

20 files changed

Lines changed: 1986 additions & 592 deletions

File tree

cpp/src/arrow/flight/integration_tests/flight_integration_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ TEST(FlightIntegration, FlightSqlExtension) {
8989
ASSERT_OK(RunScenario("flight_sql:extension"));
9090
}
9191

92+
TEST(FlightIntegration, FlightSqlIngestion) {
93+
ASSERT_OK(RunScenario("flight_sql:ingestion"));
94+
}
95+
9296
} // namespace integration_tests
9397
} // namespace flight
9498
} // namespace arrow

cpp/src/arrow/flight/integration_tests/test_integration.cc

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,7 @@ constexpr int64_t kUpdateStatementExpectedRows = 10000L;
10691069
constexpr int64_t kUpdateStatementWithTransactionExpectedRows = 15000L;
10701070
constexpr int64_t kUpdatePreparedStatementExpectedRows = 20000L;
10711071
constexpr int64_t kUpdatePreparedStatementWithTransactionExpectedRows = 25000L;
1072+
constexpr int64_t kIngestStatementExpectedRows = 3L;
10721073
constexpr char kSelectStatement[] = "SELECT STATEMENT";
10731074
constexpr char kSavepointId[] = "savepoint_id";
10741075
constexpr char kSavepointName[] = "savepoint_name";
@@ -2124,6 +2125,127 @@ class ReuseConnectionScenario : public Scenario {
21242125
return Status::OK();
21252126
}
21262127
};
2128+
2129+
std::shared_ptr<Schema> GetIngestSchema() {
2130+
return arrow::schema({arrow::field("test_field", arrow::int64(), true)});
2131+
}
2132+
2133+
arrow::Result<std::shared_ptr<RecordBatchReader>> GetIngestRecords() {
2134+
auto schema = GetIngestSchema();
2135+
auto array = arrow::ArrayFromJSON(arrow::int64(), "[null,null,null]");
2136+
auto record_batch = arrow::RecordBatch::Make(schema, 3, {array});
2137+
return RecordBatchReader::Make({record_batch});
2138+
}
2139+
2140+
/// \brief The server used for testing bulk ingestion
2141+
class FlightSqlIngestionServer : public sql::FlightSqlServerBase {
2142+
public:
2143+
FlightSqlIngestionServer() : sql::FlightSqlServerBase() {
2144+
RegisterSqlInfo(sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_BULK_INGESTION,
2145+
sql::SqlInfoResult(true));
2146+
RegisterSqlInfo(
2147+
sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED,
2148+
sql::SqlInfoResult(true));
2149+
}
2150+
2151+
arrow::Result<int64_t> DoPutCommandStatementIngest(
2152+
const ServerCallContext& context, const sql::StatementIngest& command,
2153+
FlightMessageReader* reader) override {
2154+
ARROW_RETURN_NOT_OK(AssertEq<bool>(
2155+
true,
2156+
sql::TableDefinitionOptionsTableNotExistOption::kCreate ==
2157+
command.table_definition_options.if_not_exist,
2158+
"Wrong TableDefinitionOptionsTableNotExistOption for ExecuteIngest"));
2159+
ARROW_RETURN_NOT_OK(AssertEq<bool>(
2160+
true,
2161+
sql::TableDefinitionOptionsTableExistsOption::kReplace ==
2162+
command.table_definition_options.if_exists,
2163+
"Wrong TableDefinitionOptionsTableExistsOption for ExecuteIngest"));
2164+
ARROW_RETURN_NOT_OK(AssertEq<std::string>("test_table", command.table,
2165+
"Wrong table for ExecuteIngest"));
2166+
ARROW_RETURN_NOT_OK(AssertEq<std::string>("test_schema", command.schema.value(),
2167+
"Wrong schema for ExecuteIngest"));
2168+
ARROW_RETURN_NOT_OK(AssertEq<std::string>("test_catalog", command.catalog.value(),
2169+
"Wrong catalog for ExecuteIngest"));
2170+
ARROW_RETURN_NOT_OK(AssertEq<bool>(true, command.temporary,
2171+
"Wrong temporary setting for ExecuteIngest"));
2172+
ARROW_RETURN_NOT_OK(AssertEq<std::string>("123", command.transaction_id.value(),
2173+
"Wrong transaction_id for ExecuteIngest"));
2174+
2175+
std::unordered_map<std::string, std::string> expected_options = {{"key1", "val1"},
2176+
{"key2", "val2"}};
2177+
ARROW_RETURN_NOT_OK(
2178+
AssertEq<std::size_t>(expected_options.size(), command.options.size(),
2179+
"Wrong number of options set for ExecuteIngest"));
2180+
for (auto it = expected_options.begin(); it != expected_options.end(); ++it) {
2181+
auto key = it->first;
2182+
auto expected_val = it->second;
2183+
ARROW_RETURN_NOT_OK(
2184+
AssertEq<std::string>(expected_val, command.options.at(key),
2185+
"Wrong option value set for ExecuteIngest"));
2186+
}
2187+
2188+
auto expected_schema = GetIngestSchema();
2189+
int64_t num_records = 0;
2190+
while (true) {
2191+
ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, reader->Next());
2192+
if (chunk.data == nullptr) break;
2193+
2194+
ARROW_RETURN_NOT_OK(
2195+
AssertEq(true, expected_schema->Equals(chunk.data->schema()),
2196+
"Chunk schema does not match expected schema for ExecuteIngest"));
2197+
num_records += chunk.data->num_rows();
2198+
}
2199+
2200+
return num_records;
2201+
}
2202+
};
2203+
2204+
/// \brief The FlightSqlIngestion scenario.
2205+
///
2206+
/// This tests that the client can execute bulk ingestion against the server.
2207+
///
2208+
/// The server implements DoPutCommandStatementIngest and validates that the arguments
2209+
/// it receives are the same as those supplied to the client, or have been successfully
2210+
/// mapped to the equivalent server-side representation. The size and schema of the sent
2211+
/// and received streams are also validated against eachother.
2212+
class FlightSqlIngestionScenario : public Scenario {
2213+
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
2214+
FlightServerOptions* options) override {
2215+
server->reset(new FlightSqlIngestionServer());
2216+
return Status::OK();
2217+
}
2218+
2219+
Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
2220+
2221+
Status RunClient(std::unique_ptr<FlightClient> client) override {
2222+
sql::FlightSqlClient sql_client(std::move(client));
2223+
ARROW_RETURN_NOT_OK(ValidateIngestion(&sql_client));
2224+
return Status::OK();
2225+
}
2226+
2227+
Status ValidateIngestion(sql::FlightSqlClient* sql_client) {
2228+
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader, GetIngestRecords());
2229+
2230+
sql::TableDefinitionOptions table_definition_options;
2231+
table_definition_options.if_not_exist =
2232+
sql::TableDefinitionOptionsTableNotExistOption::kCreate;
2233+
table_definition_options.if_exists =
2234+
sql::TableDefinitionOptionsTableExistsOption::kReplace;
2235+
bool temporary = true;
2236+
std::unordered_map<std::string, std::string> options = {{"key1", "val1"},
2237+
{"key2", "val2"}};
2238+
ARROW_ASSIGN_OR_RAISE(
2239+
auto updated_rows,
2240+
sql_client->ExecuteIngest({}, record_batch_reader, table_definition_options,
2241+
"test_table", "test_schema", "test_catalog", temporary,
2242+
sql::Transaction("123"), options));
2243+
ARROW_RETURN_NOT_OK(AssertEq(kIngestStatementExpectedRows, updated_rows,
2244+
"Wrong number of updated rows for ExecuteIngest"));
2245+
2246+
return Status::OK();
2247+
}
2248+
};
21272249
} // namespace
21282250

21292251
Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>* out) {
@@ -2166,6 +2288,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
21662288
} else if (scenario_name == "flight_sql:extension") {
21672289
*out = std::make_shared<FlightSqlExtensionScenario>();
21682290
return Status::OK();
2291+
} else if (scenario_name == "flight_sql:ingestion") {
2292+
*out = std::make_shared<FlightSqlIngestionScenario>();
2293+
return Status::OK();
21692294
}
21702295
return Status::KeyError("Scenario not found: ", scenario_name);
21712296
}

cpp/src/arrow/flight/sql/client.cc

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,114 @@ arrow::Result<int64_t> FlightSqlClient::ExecuteSubstraitUpdate(
256256
return update_result.record_count();
257257
}
258258

259+
arrow::Result<int64_t> FlightSqlClient::ExecuteIngest(
260+
const FlightCallOptions& options, const std::shared_ptr<RecordBatchReader>& reader,
261+
const TableDefinitionOptions& table_definition_options, const std::string& table,
262+
const std::optional<std::string>& schema, const std::optional<std::string>& catalog,
263+
const bool temporary, const Transaction& transaction,
264+
const std::unordered_map<std::string, std::string>& ingest_options) {
265+
flight_sql_pb::CommandStatementIngest command;
266+
267+
flight_sql_pb::CommandStatementIngest_TableDefinitionOptions*
268+
pb_table_definition_options =
269+
new flight_sql_pb::CommandStatementIngest_TableDefinitionOptions();
270+
switch (table_definition_options.if_not_exist) {
271+
case TableDefinitionOptionsTableNotExistOption::kUnspecified:
272+
pb_table_definition_options->set_if_not_exist(
273+
flight_sql_pb::
274+
CommandStatementIngest_TableDefinitionOptions_TableNotExistOption_TABLE_NOT_EXIST_OPTION_UNSPECIFIED); // NOLINT(whitespace/line_length)
275+
break;
276+
case TableDefinitionOptionsTableNotExistOption::kCreate:
277+
pb_table_definition_options->set_if_not_exist(
278+
flight_sql_pb::
279+
CommandStatementIngest_TableDefinitionOptions_TableNotExistOption_TABLE_NOT_EXIST_OPTION_CREATE); // NOLINT(whitespace/line_length)
280+
break;
281+
case TableDefinitionOptionsTableNotExistOption::kFail:
282+
pb_table_definition_options->set_if_not_exist(
283+
flight_sql_pb::
284+
CommandStatementIngest_TableDefinitionOptions_TableNotExistOption_TABLE_NOT_EXIST_OPTION_FAIL); // NOLINT(whitespace/line_length)
285+
break;
286+
287+
default:
288+
break;
289+
}
290+
291+
switch (table_definition_options.if_exists) {
292+
case TableDefinitionOptionsTableExistsOption::kUnspecified:
293+
pb_table_definition_options->set_if_exists(
294+
flight_sql_pb::
295+
CommandStatementIngest_TableDefinitionOptions_TableExistsOption_TABLE_EXISTS_OPTION_UNSPECIFIED); // NOLINT(whitespace/line_length)
296+
break;
297+
case TableDefinitionOptionsTableExistsOption::kFail:
298+
pb_table_definition_options->set_if_exists(
299+
flight_sql_pb::
300+
CommandStatementIngest_TableDefinitionOptions_TableExistsOption_TABLE_EXISTS_OPTION_FAIL); // NOLINT(whitespace/line_length)
301+
break;
302+
case TableDefinitionOptionsTableExistsOption::kAppend:
303+
pb_table_definition_options->set_if_exists(
304+
flight_sql_pb::
305+
CommandStatementIngest_TableDefinitionOptions_TableExistsOption_TABLE_EXISTS_OPTION_APPEND); // NOLINT(whitespace/line_length)
306+
break;
307+
case TableDefinitionOptionsTableExistsOption::kReplace:
308+
pb_table_definition_options->set_if_exists(
309+
flight_sql_pb::
310+
CommandStatementIngest_TableDefinitionOptions_TableExistsOption_TABLE_EXISTS_OPTION_REPLACE); // NOLINT(whitespace/line_length)
311+
break;
312+
313+
default:
314+
break;
315+
}
316+
317+
command.set_allocated_table_definition_options(pb_table_definition_options);
318+
command.set_table(table);
319+
320+
if (schema.has_value()) {
321+
command.set_schema(schema.value());
322+
}
323+
324+
if (catalog.has_value()) {
325+
command.set_catalog(catalog.value());
326+
}
327+
328+
command.set_temporary(temporary);
329+
330+
if (transaction.is_valid()) {
331+
command.set_transaction_id(transaction.transaction_id());
332+
}
333+
334+
auto command_options = command.mutable_options();
335+
for (const auto& [key, val] : ingest_options) {
336+
(*command_options)[key] = val;
337+
}
338+
339+
ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
340+
GetFlightDescriptorForCommand(command));
341+
342+
auto reader_ = reader.get();
343+
ARROW_ASSIGN_OR_RAISE(auto stream, DoPut(options, descriptor, reader_->schema()));
344+
345+
while (true) {
346+
ARROW_ASSIGN_OR_RAISE(auto batch, reader_->Next());
347+
if (!batch) break;
348+
ARROW_RETURN_NOT_OK(stream.writer->WriteRecordBatch(*batch));
349+
}
350+
351+
ARROW_RETURN_NOT_OK(stream.writer->DoneWriting());
352+
std::shared_ptr<Buffer> metadata;
353+
ARROW_RETURN_NOT_OK(stream.reader->ReadMetadata(&metadata));
354+
ARROW_RETURN_NOT_OK(stream.writer->Close());
355+
356+
if (!metadata) return Status::IOError("Server did not send a response");
357+
358+
flight_sql_pb::DoPutUpdateResult update_result;
359+
if (!update_result.ParseFromArray(metadata->data(),
360+
static_cast<int>(metadata->size()))) {
361+
return Status::Invalid("Unable to parse DoPutUpdateResult");
362+
}
363+
364+
return update_result.record_count();
365+
}
366+
259367
arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetCatalogs(
260368
const FlightCallOptions& options) {
261369
flight_sql_pb::CommandGetCatalogs command;

cpp/src/arrow/flight/sql/client.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
101101
const FlightCallOptions& options, const SubstraitPlan& plan,
102102
const Transaction& transaction = no_transaction());
103103

104+
/// \brief Execute a bulk ingestion to the server.
105+
/// \param[in] options RPC-layer hints for this call.
106+
/// \param[in] reader The records to ingest.
107+
/// \param[in] table_definition_options The behavior for handling the table definition.
108+
/// \param[in] table The destination table to load into.
109+
/// \param[in] schema The DB schema of the destination table.
110+
/// \param[in] catalog The catalog of the destination table.
111+
/// \param[in] temporary Use a temporary table.
112+
/// \param[in] transaction Ingest as part of this transaction.
113+
/// \param[in] ingest_options Additional, backend-specific options.
114+
/// \return The number of rows ingested to the server.
115+
arrow::Result<int64_t> ExecuteIngest(
116+
const FlightCallOptions& options, const std::shared_ptr<RecordBatchReader>& reader,
117+
const TableDefinitionOptions& table_definition_options, const std::string& table,
118+
const std::optional<std::string>& schema, const std::optional<std::string>& catalog,
119+
const bool temporary, const Transaction& transaction = no_transaction(),
120+
const std::unordered_map<std::string, std::string>& ingest_options = {});
121+
104122
/// \brief Request a list of catalogs.
105123
/// \param[in] options RPC-layer hints for this call.
106124
/// \return The FlightInfo describing where to access the dataset.

0 commit comments

Comments
 (0)