@@ -20,10 +20,12 @@ import (
2020 "fmt"
2121 "math/rand"
2222 "reflect"
23+ "time"
2324
2425 sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
2526 "google.golang.org/grpc/codes"
2627 proto3 "google.golang.org/protobuf/types/known/structpb"
28+ "google.golang.org/protobuf/types/known/timestamppb"
2729)
2830
2931// op is the mutation operation.
@@ -47,10 +49,19 @@ const (
4749 // opUpdate updates a row in a table. If the row does not already
4850 // exist, the write or transaction fails.
4951 opUpdate
52+ // opSend sends a message to a queue. Users need to specify the queue
53+ // name, a key, payload, and optionally delivery time and sending
54+ // a message.
55+ opSend
56+ // opAck acks a message in a queue, effectively remove it from the
57+ // queue. Users need to specify the queue name and the key, and optionally
58+ // a bool value to ignore error if the message does not exist.
59+ opAck
5060)
5161
5262// A Mutation describes a modification to one or more Cloud Spanner rows. The
53- // mutation represents an insert, update, delete, etc on a table.
63+ // mutation represents an insert, update, delete, etc on a table, or send, ack
64+ // on a queue.
5465//
5566// Many mutations can be applied in a single atomic commit. For purposes of
5667// constraint checking (such as foreign key constraints), the operations can be
@@ -142,6 +153,18 @@ type Mutation struct {
142153 // named by Columns.
143154 values []interface {}
144155
156+ // Queue related fields
157+ // Target queue name to be modified
158+ queue string
159+ // key is the primary key used in send or ack
160+ key Key
161+ // payload is the content of the message
162+ payload interface {}
163+ // deliverTime is optionally set for opSend
164+ deliverTime time.Time
165+ // ignoreNotFound is optionally set for opAck
166+ ignoreNotFound bool
167+
145168 // wrapped is the protobuf mutation that is the source for this Mutation.
146169 // This is only set if the [WrapMutation] function was used to create the Mutation.
147170 wrapped * sppb.Mutation
@@ -365,35 +388,88 @@ func Delete(table string, ks KeySet) *Mutation {
365388 }
366389}
367390
391+ // SendOption specifies optional fields for Send mutation
392+ type SendOption func (* Mutation )
393+
394+ // AckOption specifies optional fields for Ack mutation
395+ type AckOption func (* Mutation )
396+
397+ // WithDeliveryTime returns an SendOption to set field `deliverTime`
398+ func WithDeliveryTime (t time.Time ) SendOption {
399+ return func (m * Mutation ) {
400+ m .deliverTime = t
401+ }
402+ }
403+
404+ // Send returns a Mutation to send a message to a queue.
405+ func Send (queue string , key Key , payload interface {}, opts ... SendOption ) * Mutation {
406+ m := & Mutation {
407+ op : opSend ,
408+ queue : queue ,
409+ key : key ,
410+ payload : payload ,
411+ }
412+ for _ , opt := range opts {
413+ opt (m )
414+ }
415+ return m
416+ }
417+
418+ // WithIgnoreNotFound returns an AckOption to set field `ignoreNotFound`
419+ func WithIgnoreNotFound (ignoreNotFound bool ) AckOption {
420+ return func (m * Mutation ) {
421+ m .ignoreNotFound = ignoreNotFound
422+ }
423+ }
424+
425+ // Ack returns a Mutation to acknowledge (and thus delete) a message from a queue.
426+ func Ack (queue string , key Key , opts ... AckOption ) * Mutation {
427+ m := & Mutation {
428+ op : opAck ,
429+ queue : queue ,
430+ key : key ,
431+ }
432+ for _ , opt := range opts {
433+ opt (m )
434+ }
435+ return m
436+ }
437+
368438// WrapMutation creates a mutation that wraps an existing protobuf mutation object.
369439func WrapMutation (proto * sppb.Mutation ) (* Mutation , error ) {
370440 if proto == nil {
371441 return nil , fmt .Errorf ("protobuf mutation may not be nil" )
372442 }
373- op , table , err := getTableAndSpannerOperation (proto )
443+ op , table , queue , err := getTableOrQueueAndSpannerOperation (proto )
374444 if err != nil {
375445 return nil , err
376446 }
377447 return & Mutation {
378448 op : op ,
379449 table : table ,
450+ queue : queue ,
380451 wrapped : proto ,
381452 }, nil
382453}
383454
384- func getTableAndSpannerOperation (proto * sppb.Mutation ) (op , string , error ) {
385- if proto .GetInsert () != nil {
386- return opInsert , proto .GetInsert ().Table , nil
387- } else if proto .GetUpdate () != nil {
388- return opUpdate , proto .GetUpdate ().Table , nil
389- } else if proto .GetReplace () != nil {
390- return opReplace , proto .GetReplace ().Table , nil
391- } else if proto .GetDelete () != nil {
392- return opDelete , proto .GetDelete ().Table , nil
393- } else if proto .GetInsertOrUpdate () != nil {
394- return opInsertOrUpdate , proto .GetInsertOrUpdate ().Table , nil
395- }
396- return 0 , "" , spannerErrorf (codes .InvalidArgument , "unknown op type: %d" , proto .Operation )
455+ func getTableOrQueueAndSpannerOperation (proto * sppb.Mutation ) (op , string , string , error ) {
456+ switch op := proto .Operation .(type ) {
457+ case * sppb.Mutation_Insert :
458+ return opInsert , op .Insert .Table , "" , nil
459+ case * sppb.Mutation_Update :
460+ return opUpdate , op .Update .Table , "" , nil
461+ case * sppb.Mutation_Replace :
462+ return opReplace , op .Replace .Table , "" , nil
463+ case * sppb.Mutation_Delete_ :
464+ return opDelete , op .Delete .Table , "" , nil
465+ case * sppb.Mutation_InsertOrUpdate :
466+ return opInsertOrUpdate , op .InsertOrUpdate .Table , "" , nil
467+ case * sppb.Mutation_Send_ :
468+ return opSend , "" , op .Send .Queue , nil
469+ case * sppb.Mutation_Ack_ :
470+ return opAck , "" , op .Ack .Queue , nil
471+ }
472+ return 0 , "" , "" , spannerErrorf (codes .InvalidArgument , "unknown op type: %T" , proto .Operation )
397473}
398474
399475// prepareWrite generates sppb.Mutation_Write from table name, column names
@@ -465,6 +541,43 @@ func (m Mutation) proto() (*sppb.Mutation, error) {
465541 return nil , err
466542 }
467543 pb = & sppb.Mutation {Operation : & sppb.Mutation_Update {Update : w }}
544+ case opSend :
545+ k , err := encodeValueArray ([]interface {}(m .key ))
546+ if err != nil {
547+ return nil , err
548+ }
549+ p , _ , err := encodeValue (m .payload )
550+ if err != nil {
551+ return nil , err
552+ }
553+ var dt * timestamppb.Timestamp
554+ if ! m .deliverTime .IsZero () {
555+ dt = timestamppb .New (m .deliverTime )
556+ }
557+ pb = & sppb.Mutation {
558+ Operation : & sppb.Mutation_Send_ {
559+ Send : & sppb.Mutation_Send {
560+ Queue : m .queue ,
561+ Key : k ,
562+ Payload : p ,
563+ DeliverTime : dt ,
564+ },
565+ },
566+ }
567+ case opAck :
568+ k , err := encodeValueArray ([]interface {}(m .key ))
569+ if err != nil {
570+ return nil , err
571+ }
572+ pb = & sppb.Mutation {
573+ Operation : & sppb.Mutation_Ack_ {
574+ Ack : & sppb.Mutation_Ack {
575+ Queue : m .queue ,
576+ Key : k ,
577+ IgnoreNotFound : m .ignoreNotFound ,
578+ },
579+ },
580+ }
468581 default :
469582 return nil , errInvdMutationOp (m )
470583 }
0 commit comments