@@ -82,6 +82,17 @@ func flightInfoForCommand(ctx context.Context, cl *Client, cmd proto.Message, op
8282 return cl .getFlightInfo (ctx , desc , opts ... )
8383}
8484
85+ func pollInfoForCommand (ctx context.Context , cl * Client , cmd proto.Message , retryDescriptor * flight.FlightDescriptor , opts ... grpc.CallOption ) (* flight.PollInfo , error ) {
86+ if retryDescriptor != nil {
87+ return cl .Client .PollFlightInfo (ctx , retryDescriptor , opts ... )
88+ }
89+ desc , err := descForCommand (cmd )
90+ if err != nil {
91+ return nil , err
92+ }
93+ return cl .Client .PollFlightInfo (ctx , desc , opts ... )
94+ }
95+
8596func schemaForCommand (ctx context.Context , cl * Client , cmd proto.Message , opts ... grpc.CallOption ) (* flight.SchemaResult , error ) {
8697 desc , err := descForCommand (cmd )
8798 if err != nil {
@@ -123,6 +134,14 @@ func (c *Client) Execute(ctx context.Context, query string, opts ...grpc.CallOpt
123134 return flightInfoForCommand (ctx , c , & cmd , opts ... )
124135}
125136
137+ // ExecutePoll idempotently starts execution of a query/checks for completion.
138+ // To check for completion, pass the FlightDescriptor from the previous call
139+ // to ExecutePoll as the retryDescriptor.
140+ func (c * Client ) ExecutePoll (ctx context.Context , query string , retryDescriptor * flight.FlightDescriptor , opts ... grpc.CallOption ) (* flight.PollInfo , error ) {
141+ cmd := pb.CommandStatementQuery {Query : query }
142+ return pollInfoForCommand (ctx , c , & cmd , retryDescriptor , opts ... )
143+ }
144+
126145// GetExecuteSchema gets the schema of the result set of a query without
127146// executing the query itself.
128147func (c * Client ) GetExecuteSchema (ctx context.Context , query string , opts ... grpc.CallOption ) (* flight.SchemaResult , error ) {
@@ -136,6 +155,12 @@ func (c *Client) ExecuteSubstrait(ctx context.Context, plan SubstraitPlan, opts
136155 return flightInfoForCommand (ctx , c , & cmd , opts ... )
137156}
138157
158+ func (c * Client ) ExecuteSubstraitPoll (ctx context.Context , plan SubstraitPlan , retryDescriptor * flight.FlightDescriptor , opts ... grpc.CallOption ) (* flight.PollInfo , error ) {
159+ cmd := pb.CommandStatementSubstraitPlan {
160+ Plan : & pb.SubstraitPlan {Plan : plan .Plan , Version : plan .Version }}
161+ return pollInfoForCommand (ctx , c , & cmd , retryDescriptor , opts ... )
162+ }
163+
139164func (c * Client ) GetExecuteSubstraitSchema (ctx context.Context , plan SubstraitPlan , opts ... grpc.CallOption ) (* flight.SchemaResult , error ) {
140165 cmd := pb.CommandStatementSubstraitPlan {
141166 Plan : & pb.SubstraitPlan {Plan : plan .Plan , Version : plan .Version }}
@@ -606,6 +631,15 @@ func (tx *Txn) Execute(ctx context.Context, query string, opts ...grpc.CallOptio
606631 return flightInfoForCommand (ctx , tx .c , cmd , opts ... )
607632}
608633
634+ func (tx * Txn ) ExecutePoll (ctx context.Context , query string , retryDescriptor * flight.FlightDescriptor , opts ... grpc.CallOption ) (* flight.PollInfo , error ) {
635+ if ! tx .txn .IsValid () {
636+ return nil , ErrInvalidTxn
637+ }
638+ // The server should encode the transaction into the retry descriptor
639+ cmd := & pb.CommandStatementQuery {Query : query , TransactionId : tx .txn }
640+ return pollInfoForCommand (ctx , tx .c , cmd , retryDescriptor , opts ... )
641+ }
642+
609643func (tx * Txn ) ExecuteSubstrait (ctx context.Context , plan SubstraitPlan , opts ... grpc.CallOption ) (* flight.FlightInfo , error ) {
610644 if ! tx .txn .IsValid () {
611645 return nil , ErrInvalidTxn
@@ -616,6 +650,18 @@ func (tx *Txn) ExecuteSubstrait(ctx context.Context, plan SubstraitPlan, opts ..
616650 return flightInfoForCommand (ctx , tx .c , cmd , opts ... )
617651}
618652
653+ func (tx * Txn ) ExecuteSubstraitPoll (ctx context.Context , plan SubstraitPlan , retryDescriptor * flight.FlightDescriptor , opts ... grpc.CallOption ) (* flight.PollInfo , error ) {
654+ if ! tx .txn .IsValid () {
655+ return nil , ErrInvalidTxn
656+ }
657+ // The server should encode the transaction into the retry descriptor
658+ cmd := & pb.CommandStatementSubstraitPlan {
659+ Plan : & pb.SubstraitPlan {Plan : plan .Plan , Version : plan .Version },
660+ TransactionId : tx .txn ,
661+ }
662+ return pollInfoForCommand (ctx , tx .c , cmd , retryDescriptor , opts ... )
663+ }
664+
619665func (tx * Txn ) GetExecuteSchema (ctx context.Context , query string , opts ... grpc.CallOption ) (* flight.SchemaResult , error ) {
620666 if ! tx .txn .IsValid () {
621667 return nil , ErrInvalidTxn
@@ -981,6 +1027,52 @@ func (p *PreparedStatement) Execute(ctx context.Context, opts ...grpc.CallOption
9811027 return p .client .getFlightInfo (ctx , desc , opts ... )
9821028}
9831029
1030+ // ExecutePoll executes the prepared statement on the server and returns a PollInfo
1031+ // indicating the progress of execution.
1032+ //
1033+ // Will error if already closed.
1034+ func (p * PreparedStatement ) ExecutePoll (ctx context.Context , retryDescriptor * flight.FlightDescriptor , opts ... grpc.CallOption ) (* flight.PollInfo , error ) {
1035+ if p .closed {
1036+ return nil , errors .New ("arrow/flightsql: prepared statement already closed" )
1037+ }
1038+
1039+ cmd := & pb.CommandPreparedStatementQuery {PreparedStatementHandle : p .handle }
1040+
1041+ desc := retryDescriptor
1042+ var err error
1043+
1044+ if desc == nil {
1045+ desc , err = descForCommand (cmd )
1046+ if err != nil {
1047+ return nil , err
1048+ }
1049+ }
1050+
1051+ if retryDescriptor == nil {
1052+ if p .hasBindParameters () {
1053+ pstream , err := p .client .Client .DoPut (ctx , opts ... )
1054+ if err != nil {
1055+ return nil , err
1056+ }
1057+
1058+ wr , err := p .writeBindParameters (pstream , desc )
1059+ if err != nil {
1060+ return nil , err
1061+ }
1062+ if err = wr .Close (); err != nil {
1063+ return nil , err
1064+ }
1065+ pstream .CloseSend ()
1066+
1067+ // wait for the server to ack the result
1068+ if _ , err = pstream .Recv (); err != nil && err != io .EOF {
1069+ return nil , err
1070+ }
1071+ }
1072+ }
1073+ return p .client .Client .PollFlightInfo (ctx , desc , opts ... )
1074+ }
1075+
9841076// ExecuteUpdate executes the prepared statement update query on the server
9851077// and returns the number of rows affected. If SetParameters was called,
9861078// the parameter bindings will be sent with the request to execute.
0 commit comments