Skip to content

Commit 107b215

Browse files
koulidavidmalamb
authored
GH-36155: [C++][Go][Java][FlightRPC] Add support for long-running queries (#36946)
### Rationale for this change In Flight RPC, FlightInfo includes addresses of workers alongside result partition info. This lets clients fetch data directly from workers, in parallel or even distributed across multiple machines. But this also comes with tradeoffs. Queries generally don't complete instantly (as much as we would like them to). So where can we put the 'query evaluation time'? * In `GetFlightInfo`: block and wait for the query to complete. * Con: this is a long-running blocking call, which may fail or time out. Then when the client retries, the server has to redo all the work. * Con: parts of the result may be ready before others, but the client can't do anything until everything is ready. * In `DoGet`: return a fixed number of partitions * Con: this makes handling worker failures hard. Systems like Trino support fault-tolerant execution by replacing workers at runtime. But GetFlightInfo has already passed, so we can't notify the client of new workers. * Con: we have to know or fix the partitioning up front. Neither solution is optimal. ### What changes are included in this PR? We can address this by adding a retryable version of `GetFlightInfo`: `PollFlightInfo(FlightDescriptor)` `PollFlightInfo` returns `PollInfo`: ```proto message PollInfo { // The currently available results so far. FlightInfo info = 1; // The descriptor the client should use on the next try. // If unset, the query is complete. FlightDescriptor flight_descriptor = 2; // Query progress. Must be in [0.0, 1.0] but need not be // monotonic or nondecreasing. If unknown, do not set. optional double progress = 3; // Expiration time for this request. After this passes, the server // might not accept the retry descriptor anymore (and the query may // be cancelled). This may be updated on a call to PollFlightInfo. google.protobuf.Timestamp expiration_time = 4; } ``` See the documentation changes for details of them: http://crossbow.voltrondata.com/pr_docs/36946/format/Flight.html#downloading-data-by-running-a-heavy-query ### Are these changes tested? Yes. This has C++, Go and Java implementations and an integration test with them. ### Are there any user-facing changes? Yes. * Closes: #36155 Lead-authored-by: Sutou Kouhei <kou@clear-code.com> Co-authored-by: Sutou Kouhei <kou@cozmixng.org> Co-authored-by: David Li <li.davidm96@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Signed-off-by: Sutou Kouhei <kou@clear-code.com>
1 parent 95db0df commit 107b215

41 files changed

Lines changed: 1762 additions & 593 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cpp/src/arrow/flight/CMakeLists.txt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,15 @@ set(FLIGHT_GENERATED_PROTO_FILES
8383

8484
set(PROTO_DEPENDS ${FLIGHT_PROTO} ${ARROW_PROTOBUF_LIBPROTOBUF} gRPC::grpc_cpp_plugin)
8585

86+
set(FLIGHT_PROTOC_COMMAND ${ARROW_PROTOBUF_PROTOC} "-I${FLIGHT_PROTO_PATH}")
87+
if(Protobuf_VERSION VERSION_LESS 3.15)
88+
list(APPEND FLIGHT_PROTOC_COMMAND "--experimental_allow_proto3_optional")
89+
endif()
8690
add_custom_command(OUTPUT ${FLIGHT_GENERATED_PROTO_FILES}
87-
COMMAND ${ARROW_PROTOBUF_PROTOC} "-I${FLIGHT_PROTO_PATH}"
88-
"--cpp_out=${CMAKE_CURRENT_BINARY_DIR}" "${FLIGHT_PROTO}"
8991
DEPENDS ${PROTO_DEPENDS} ARGS
90-
COMMAND ${ARROW_PROTOBUF_PROTOC} "-I${FLIGHT_PROTO_PATH}"
92+
COMMAND ${FLIGHT_PROTOC_COMMAND}
93+
"--cpp_out=${CMAKE_CURRENT_BINARY_DIR}" "${FLIGHT_PROTO}"
94+
COMMAND ${FLIGHT_PROTOC_COMMAND}
9195
"--grpc_out=${CMAKE_CURRENT_BINARY_DIR}"
9296
"--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>"
9397
"${FLIGHT_PROTO}")

cpp/src/arrow/flight/client.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,14 @@ arrow::Future<FlightInfo> FlightClient::GetFlightInfoAsync(
640640
return future;
641641
}
642642

643+
arrow::Result<std::unique_ptr<PollInfo>> FlightClient::PollFlightInfo(
644+
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
645+
std::unique_ptr<PollInfo> info;
646+
RETURN_NOT_OK(CheckOpen());
647+
RETURN_NOT_OK(transport_->PollFlightInfo(options, descriptor, &info));
648+
return info;
649+
}
650+
643651
arrow::Result<std::unique_ptr<SchemaResult>> FlightClient::GetSchema(
644652
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
645653
RETURN_NOT_OK(CheckOpen());

cpp/src/arrow/flight/client.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,19 @@ class ARROW_FLIGHT_EXPORT FlightClient {
296296
return GetFlightInfoAsync({}, descriptor);
297297
}
298298

299+
/// \brief Request and poll a long running query
300+
/// \param[in] options Per-RPC options
301+
/// \param[in] descriptor the dataset request or a descriptor returned by a
302+
/// prioir PollFlightInfo call
303+
/// \return Arrow result with the PollInfo describing the status of
304+
/// the requested query
305+
arrow::Result<std::unique_ptr<PollInfo>> PollFlightInfo(
306+
const FlightCallOptions& options, const FlightDescriptor& descriptor);
307+
arrow::Result<std::unique_ptr<PollInfo>> PollFlightInfo(
308+
const FlightDescriptor& descriptor) {
309+
return PollFlightInfo({}, descriptor);
310+
}
311+
299312
/// \brief Request schema for a single flight, which may be an existing
300313
/// dataset or a command to be executed
301314
/// \param[in] options Per-RPC options

cpp/src/arrow/flight/flight_internals_test.cc

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,12 @@ void TestRoundtrip(const std::vector<FlightType>& values,
6464

6565
ASSERT_OK_AND_ASSIGN(std::string serialized, values[i].SerializeToString());
6666
ASSERT_OK_AND_ASSIGN(auto deserialized, FlightType::Deserialize(serialized));
67-
if constexpr (std::is_same_v<FlightType, FlightInfo>) {
67+
if constexpr (std::is_same_v<FlightType, FlightInfo> ||
68+
std::is_same_v<FlightType, PollInfo>) {
69+
ARROW_SCOPED_TRACE("Deserialized = ", deserialized->ToString());
6870
EXPECT_EQ(values[i], *deserialized);
6971
} else {
72+
ARROW_SCOPED_TRACE("Deserialized = ", deserialized.ToString());
7073
EXPECT_EQ(values[i], deserialized);
7174
}
7275

@@ -255,6 +258,38 @@ TEST(FlightTypes, FlightInfo) {
255258
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
256259
}
257260

261+
TEST(FlightTypes, PollInfo) {
262+
ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", 1234));
263+
Schema schema({field("ints", int64())});
264+
auto desc = FlightDescriptor::Command("foo");
265+
auto endpoint = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt};
266+
auto info = MakeFlightInfo(schema, desc, {endpoint}, -1, 42, true);
267+
// 2023-06-19 03:14:06.004330100
268+
// We must use microsecond resolution here for portability.
269+
// std::chrono::system_clock::time_point may not provide nanosecond
270+
// resolution on some platforms such as Windows.
271+
const auto expiration_time_duration =
272+
std::chrono::seconds{1687144446} + std::chrono::nanoseconds{4339000};
273+
Timestamp expiration_time(
274+
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
275+
std::vector<PollInfo> values = {
276+
PollInfo{std::make_unique<FlightInfo>(info), std::nullopt, std::nullopt,
277+
std::nullopt},
278+
PollInfo{std::make_unique<FlightInfo>(info), FlightDescriptor::Command("poll"), 0.1,
279+
expiration_time},
280+
};
281+
std::vector<std::string> reprs = {
282+
"<PollInfo info=" + info.ToString() +
283+
" descriptor=null "
284+
"progress=null expiration_time=null>",
285+
"<PollInfo info=" + info.ToString() +
286+
" descriptor=<FlightDescriptor cmd='poll'> "
287+
"progress=0.1 expiration_time=2023-06-19 03:14:06.004339000>",
288+
};
289+
290+
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::PollInfo>(values, reprs));
291+
}
292+
258293
TEST(FlightTypes, Result) {
259294
std::vector<Result> values = {
260295
{Buffer::FromString("")},

cpp/src/arrow/flight/integration_tests/flight_integration_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
7171
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
7272
}
7373

74+
TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }
75+
7476
TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }
7577

7678
TEST(FlightIntegration, FlightSqlExtension) {

cpp/src/arrow/flight/integration_tests/test_integration.cc

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -708,9 +708,7 @@ class ExpirationTimeCancelFlightInfoScenario : public Scenario {
708708

709709
/// \brief The expiration time scenario - RenewFlightEndpoint.
710710
///
711-
/// This tests that the client can renew a FlightEndpoint and read
712-
/// data in renewed expiration time even when the original
713-
/// expiration time is over.
711+
/// This tests that the client can renew a FlightEndpoint.
714712
class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
715713
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
716714
FlightServerOptions* options) override {
@@ -746,6 +744,77 @@ class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
746744
}
747745
};
748746

747+
/// \brief The server used for testing PollFlightInfo().
748+
class PollFlightInfoServer : public FlightServerBase {
749+
public:
750+
PollFlightInfoServer() : FlightServerBase() {}
751+
752+
Status PollFlightInfo(const ServerCallContext& context,
753+
const FlightDescriptor& descriptor,
754+
std::unique_ptr<PollInfo>* result) override {
755+
auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
756+
std::vector<FlightEndpoint> endpoints = {
757+
FlightEndpoint{{"long-running query"}, {}, std::nullopt}};
758+
ARROW_ASSIGN_OR_RAISE(
759+
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
760+
if (descriptor == FlightDescriptor::Command("poll")) {
761+
*result = std::make_unique<PollInfo>(std::make_unique<FlightInfo>(std::move(info)),
762+
std::nullopt, 1.0, std::nullopt);
763+
} else {
764+
*result =
765+
std::make_unique<PollInfo>(std::make_unique<FlightInfo>(std::move(info)),
766+
FlightDescriptor::Command("poll"), 0.1,
767+
Timestamp::clock::now() + std::chrono::seconds{10});
768+
}
769+
return Status::OK();
770+
}
771+
};
772+
773+
/// \brief The PollFlightInfo scenario.
774+
///
775+
/// This tests that the client can poll a long-running query.
776+
class PollFlightInfoScenario : public Scenario {
777+
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
778+
FlightServerOptions* options) override {
779+
*server = std::make_unique<PollFlightInfoServer>();
780+
return Status::OK();
781+
}
782+
783+
Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
784+
785+
Status RunClient(std::unique_ptr<FlightClient> client) override {
786+
ARROW_ASSIGN_OR_RAISE(
787+
auto info, client->PollFlightInfo(FlightDescriptor::Command("heavy query")));
788+
if (!info->descriptor.has_value()) {
789+
return Status::Invalid("Description is missing: ", info->ToString());
790+
}
791+
if (!info->progress.has_value()) {
792+
return Status::Invalid("Progress is missing: ", info->ToString());
793+
}
794+
if (!(0.0 <= *info->progress && *info->progress <= 1.0)) {
795+
return Status::Invalid("Invalid progress: ", info->ToString());
796+
}
797+
if (!info->expiration_time.has_value()) {
798+
return Status::Invalid("Expiration time is missing: ", info->ToString());
799+
}
800+
ARROW_ASSIGN_OR_RAISE(info, client->PollFlightInfo(*info->descriptor));
801+
if (info->descriptor.has_value()) {
802+
return Status::Invalid("Retried but not finished yet: ", info->ToString());
803+
}
804+
if (!info->progress.has_value()) {
805+
return Status::Invalid("Progress is missing in finished query: ", info->ToString());
806+
}
807+
if (fabs(*info->progress - 1.0) > arrow::kDefaultAbsoluteTolerance) {
808+
return Status::Invalid("Progress for finished query isn't 1.0: ", info->ToString());
809+
}
810+
if (info->expiration_time.has_value()) {
811+
return Status::Invalid("Expiration time must not be set for finished query: ",
812+
info->ToString());
813+
}
814+
return Status::OK();
815+
}
816+
};
817+
749818
/// \brief Schema to be returned for mocking the statement/prepared statement results.
750819
///
751820
/// Must be the same across all languages.
@@ -1825,6 +1894,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
18251894
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
18261895
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
18271896
return Status::OK();
1897+
} else if (scenario_name == "poll_flight_info") {
1898+
*out = std::make_shared<PollFlightInfoScenario>();
1899+
return Status::OK();
18281900
} else if (scenario_name == "flight_sql") {
18291901
*out = std::make_shared<FlightSqlScenario>();
18301902
return Status::OK();

cpp/src/arrow/flight/middleware.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ enum class FlightMethod : char {
5757
DoAction = 7,
5858
ListActions = 8,
5959
DoExchange = 9,
60+
PollFlightInfo = 10,
6061
};
6162

6263
/// \brief Get a human-readable name for a Flight method.

cpp/src/arrow/flight/serialization_internal.cc

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,26 @@ namespace arrow {
3131
namespace flight {
3232
namespace internal {
3333

34+
// Timestamp
35+
36+
Status FromProto(const google::protobuf::Timestamp& pb_timestamp, Timestamp* timestamp) {
37+
const auto seconds = std::chrono::seconds{pb_timestamp.seconds()};
38+
const auto nanoseconds = std::chrono::nanoseconds{pb_timestamp.nanos()};
39+
const auto duration =
40+
std::chrono::duration_cast<Timestamp::duration>(seconds + nanoseconds);
41+
*timestamp = Timestamp(duration);
42+
return Status::OK();
43+
}
44+
45+
Status ToProto(const Timestamp& timestamp, google::protobuf::Timestamp* pb_timestamp) {
46+
const auto since_epoch = timestamp.time_since_epoch();
47+
const auto since_epoch_ns =
48+
std::chrono::duration_cast<std::chrono::nanoseconds>(since_epoch).count();
49+
pb_timestamp->set_seconds(since_epoch_ns / std::nano::den);
50+
pb_timestamp->set_nanos(since_epoch_ns % std::nano::den);
51+
return Status::OK();
52+
}
53+
3454
// ActionType
3555

3656
Status FromProto(const pb::ActionType& pb_type, ActionType* type) {
@@ -153,13 +173,9 @@ Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint
153173
RETURN_NOT_OK(FromProto(pb_endpoint.location(i), &endpoint->locations[i]));
154174
}
155175
if (pb_endpoint.has_expiration_time()) {
156-
const auto& pb_expiration_time = pb_endpoint.expiration_time();
157-
const auto seconds = std::chrono::seconds{pb_expiration_time.seconds()};
158-
const auto nanoseconds = std::chrono::nanoseconds{pb_expiration_time.nanos()};
159-
const auto duration =
160-
std::chrono::duration_cast<Timestamp::duration>(seconds + nanoseconds);
161-
const Timestamp expiration_time(duration);
162-
endpoint->expiration_time = expiration_time;
176+
Timestamp expiration_time;
177+
RETURN_NOT_OK(FromProto(pb_endpoint.expiration_time(), &expiration_time));
178+
endpoint->expiration_time = std::move(expiration_time);
163179
}
164180
return Status::OK();
165181
}
@@ -171,13 +187,8 @@ Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint)
171187
RETURN_NOT_OK(ToProto(location, pb_endpoint->add_location()));
172188
}
173189
if (endpoint.expiration_time) {
174-
const auto expiration_time = endpoint.expiration_time.value();
175-
const auto since_epoch = expiration_time.time_since_epoch();
176-
const auto since_epoch_ns =
177-
std::chrono::duration_cast<std::chrono::nanoseconds>(since_epoch).count();
178-
auto pb_expiration_time = pb_endpoint->mutable_expiration_time();
179-
pb_expiration_time->set_seconds(since_epoch_ns / std::nano::den);
180-
pb_expiration_time->set_nanos(since_epoch_ns % std::nano::den);
190+
RETURN_NOT_OK(ToProto(endpoint.expiration_time.value(),
191+
pb_endpoint->mutable_expiration_time()));
181192
}
182193
return Status::OK();
183194
}
@@ -288,6 +299,47 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
288299
return Status::OK();
289300
}
290301

302+
// PollInfo
303+
304+
Status FromProto(const pb::PollInfo& pb_info, PollInfo* info) {
305+
ARROW_ASSIGN_OR_RAISE(auto flight_info, FromProto(pb_info.info()));
306+
info->info = std::make_unique<FlightInfo>(std::move(flight_info));
307+
if (pb_info.has_flight_descriptor()) {
308+
FlightDescriptor descriptor;
309+
RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &descriptor));
310+
info->descriptor = std::move(descriptor);
311+
} else {
312+
info->descriptor = std::nullopt;
313+
}
314+
if (pb_info.has_progress()) {
315+
info->progress = pb_info.progress();
316+
} else {
317+
info->progress = std::nullopt;
318+
}
319+
if (pb_info.has_expiration_time()) {
320+
Timestamp expiration_time;
321+
RETURN_NOT_OK(FromProto(pb_info.expiration_time(), &expiration_time));
322+
info->expiration_time = std::move(expiration_time);
323+
} else {
324+
info->expiration_time = std::nullopt;
325+
}
326+
return Status::OK();
327+
}
328+
329+
Status ToProto(const PollInfo& info, pb::PollInfo* pb_info) {
330+
RETURN_NOT_OK(ToProto(*info.info, pb_info->mutable_info()));
331+
if (info.descriptor) {
332+
RETURN_NOT_OK(ToProto(*info.descriptor, pb_info->mutable_flight_descriptor()));
333+
}
334+
if (info.progress) {
335+
pb_info->set_progress(info.progress.value());
336+
}
337+
if (info.expiration_time) {
338+
RETURN_NOT_OK(ToProto(*info.expiration_time, pb_info->mutable_expiration_time()));
339+
}
340+
return Status::OK();
341+
}
342+
291343
// CancelFlightInfoRequest
292344

293345
Status FromProto(const pb::CancelFlightInfoRequest& pb_request,

cpp/src/arrow/flight/serialization_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Status SchemaToString(const Schema& schema, std::string* out);
4545

4646
// These functions depend on protobuf types which are not exported in the Flight DLL.
4747

48+
Status FromProto(const google::protobuf::Timestamp& pb_timestamp, Timestamp* timestamp);
4849
Status FromProto(const pb::ActionType& pb_type, ActionType* type);
4950
Status FromProto(const pb::Action& pb_action, Action* action);
5051
Status FromProto(const pb::Result& pb_result, Result* result);
@@ -60,16 +61,19 @@ Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint
6061
Status FromProto(const pb::RenewFlightEndpointRequest& pb_request,
6162
RenewFlightEndpointRequest* request);
6263
arrow::Result<FlightInfo> FromProto(const pb::FlightInfo& pb_info);
64+
Status FromProto(const pb::PollInfo& pb_info, PollInfo* info);
6365
Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
6466
CancelFlightInfoRequest* request);
6567
Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
6668
Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* info);
6769

70+
Status ToProto(const Timestamp& timestamp, google::protobuf::Timestamp* pb_timestamp);
6871
Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
6972
Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint);
7073
Status ToProto(const RenewFlightEndpointRequest& request,
7174
pb::RenewFlightEndpointRequest* pb_request);
7275
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
76+
Status ToProto(const PollInfo& info, pb::PollInfo* pb_info);
7377
Status ToProto(const CancelFlightInfoRequest& request,
7478
pb::CancelFlightInfoRequest* pb_request);
7579
Status ToProto(const ActionType& type, pb::ActionType* pb_type);

cpp/src/arrow/flight/server.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,12 @@ Status FlightServerBase::GetFlightInfo(const ServerCallContext& context,
231231
return Status::NotImplemented("NYI");
232232
}
233233

234+
Status FlightServerBase::PollFlightInfo(const ServerCallContext& context,
235+
const FlightDescriptor& request,
236+
std::unique_ptr<PollInfo>* info) {
237+
return Status::NotImplemented("NYI");
238+
}
239+
234240
Status FlightServerBase::DoGet(const ServerCallContext& context, const Ticket& request,
235241
std::unique_ptr<FlightDataStream>* data_stream) {
236242
return Status::NotImplemented("NYI");

0 commit comments

Comments
 (0)