Skip to content

Commit f609bb1

Browse files
authored
GH-39910: [Go] Add func to load prepared statement from ActionCreatePreparedStatementResult (#39913)
Currently, in order to create a PreparedStatement a DoAction call will always be made via the client. I need to be able to make a PreparedStatement from persisted data that will not trigger the DoAction call to the server. * Closes: #39910 Authored-by: Alva Bandy <abandy@live.com> Signed-off-by: Matt Topol <zotthewizard@gmail.com>
1 parent e83295b commit f609bb1

3 files changed

Lines changed: 97 additions & 0 deletions

File tree

go/arrow/flight/flightsql/client.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,31 @@ func (c *Client) PrepareSubstrait(ctx context.Context, plan SubstraitPlan, opts
450450
return parsePreparedStatementResponse(c, c.Alloc, stream)
451451
}
452452

453+
func (c *Client) LoadPreparedStatementFromResult(result *CreatePreparedStatementResult) (*PreparedStatement, error) {
454+
var (
455+
err error
456+
dsSchema, paramSchema *arrow.Schema
457+
)
458+
if result.DatasetSchema != nil {
459+
dsSchema, err = flight.DeserializeSchema(result.DatasetSchema, c.Alloc)
460+
if err != nil {
461+
return nil, err
462+
}
463+
}
464+
if result.ParameterSchema != nil {
465+
paramSchema, err = flight.DeserializeSchema(result.ParameterSchema, c.Alloc)
466+
if err != nil {
467+
return nil, err
468+
}
469+
}
470+
return &PreparedStatement{
471+
client: c,
472+
handle: result.PreparedStatementHandle,
473+
datasetSchema: dsSchema,
474+
paramSchema: paramSchema,
475+
}, nil
476+
}
477+
453478
func parsePreparedStatementResponse(c *Client, mem memory.Allocator, results pb.FlightService_DoActionClient) (*PreparedStatement, error) {
454479
if err := results.CloseSend(); err != nil {
455480
return nil, err
@@ -1027,6 +1052,46 @@ func (p *PreparedStatement) Execute(ctx context.Context, opts ...grpc.CallOption
10271052
return p.client.getFlightInfo(ctx, desc, opts...)
10281053
}
10291054

1055+
// ExecutePut calls DoPut for the prepared statement on the server. If SetParameters
1056+
// has been called then the parameter bindings will be sent before execution.
1057+
//
1058+
// Will error if already closed.
1059+
func (p *PreparedStatement) ExecutePut(ctx context.Context, opts ...grpc.CallOption) error {
1060+
if p.closed {
1061+
return errors.New("arrow/flightsql: prepared statement already closed")
1062+
}
1063+
1064+
cmd := &pb.CommandPreparedStatementQuery{PreparedStatementHandle: p.handle}
1065+
1066+
desc, err := descForCommand(cmd)
1067+
if err != nil {
1068+
return err
1069+
}
1070+
1071+
if p.hasBindParameters() {
1072+
pstream, err := p.client.Client.DoPut(ctx, opts...)
1073+
if err != nil {
1074+
return err
1075+
}
1076+
1077+
wr, err := p.writeBindParameters(pstream, desc)
1078+
if err != nil {
1079+
return err
1080+
}
1081+
if err = wr.Close(); err != nil {
1082+
return err
1083+
}
1084+
pstream.CloseSend()
1085+
1086+
// wait for the server to ack the result
1087+
if _, err = pstream.Recv(); err != nil && err != io.EOF {
1088+
return err
1089+
}
1090+
}
1091+
1092+
return nil
1093+
}
1094+
10301095
// ExecutePoll executes the prepared statement on the server and returns a PollInfo
10311096
// indicating the progress of execution.
10321097
//

go/arrow/flight/flightsql/client_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,36 @@ func (s *FlightSqlClientSuite) TestRenewFlightEndpoint() {
665665
s.Equal(&mockedRenewedEndpoint, renewedEndpoint)
666666
}
667667

668+
func (s *FlightSqlClientSuite) TestPreparedStatementLoadFromResult() {
669+
const query = "query"
670+
671+
result := &pb.ActionCreatePreparedStatementResult{
672+
PreparedStatementHandle: []byte(query),
673+
}
674+
675+
parameterSchemaResult := arrow.NewSchema([]arrow.Field{{Name: "p_id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil)
676+
result.ParameterSchema = flight.SerializeSchema(parameterSchemaResult, memory.DefaultAllocator)
677+
datasetSchemaResult := arrow.NewSchema([]arrow.Field{{Name: "ds_id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil)
678+
result.DatasetSchema = flight.SerializeSchema(datasetSchemaResult, memory.DefaultAllocator)
679+
680+
prepared, err := s.sqlClient.LoadPreparedStatementFromResult(result)
681+
s.NoError(err)
682+
683+
s.Equal(string(prepared.Handle()), "query")
684+
685+
paramSchema := prepared.ParameterSchema()
686+
paramRec, _, err := array.RecordFromJSON(memory.DefaultAllocator, paramSchema, strings.NewReader(`[{"p_id": 1}]`))
687+
s.NoError(err)
688+
defer paramRec.Release()
689+
690+
datasetSchema := prepared.DatasetSchema()
691+
datasetRec, _, err := array.RecordFromJSON(memory.DefaultAllocator, datasetSchema, strings.NewReader(`[{"ds_id": 1}]`))
692+
s.NoError(err)
693+
defer datasetRec.Release()
694+
695+
s.Equal(string(prepared.Handle()), "query")
696+
}
697+
668698
func TestFlightSqlClient(t *testing.T) {
669699
suite.Run(t, new(FlightSqlClientSuite))
670700
}

go/arrow/flight/flightsql/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,3 +852,5 @@ const (
852852
// cancellation request.
853853
CancelResultNotCancellable = pb.ActionCancelQueryResult_CANCEL_RESULT_NOT_CANCELLABLE
854854
)
855+
856+
type CreatePreparedStatementResult = pb.ActionCreatePreparedStatementResult

0 commit comments

Comments
 (0)