Skip to content

Commit 1cf600d

Browse files
feat: Adding Send and Ack Mutation Support for Cloud Spanner Queue (#13616)
feat: Adding Send and Ack Mutation Support for Cloud Spanner Queue Added new mutation types Send and Ack for Cloud Spanner Queue. Updated Mutation Struct and all the relevant usages. Added Unit Tests and integration test coverage.
1 parent ad99e3d commit 1cf600d

File tree

5 files changed

+558
-84
lines changed

5 files changed

+558
-84
lines changed

spanner/client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6029,7 +6029,7 @@ func TestClient_BatchWrite(t *testing.T) {
60296029
defer teardown()
60306030
mutationGroups := []*MutationGroup{
60316031
{[]*Mutation{
6032-
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}, nil},
6032+
{op: opInsertOrUpdate, table: "t_test", columns: []string{"key", "val"}, values: []interface{}{"foo1", 1}},
60336033
}},
60346034
}
60356035
iter := client.BatchWrite(context.Background(), mutationGroups)
@@ -6064,7 +6064,7 @@ func TestClient_BatchWrite_SessionNotFound(t *testing.T) {
60646064
)
60656065
mutationGroups := []*MutationGroup{
60666066
{[]*Mutation{
6067-
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}, nil},
6067+
{op: opInsertOrUpdate, table: "t_test", columns: []string{"key", "val"}, values: []interface{}{"foo1", 1}},
60686068
}},
60696069
}
60706070
iter := client.BatchWrite(context.Background(), mutationGroups)
@@ -6102,7 +6102,7 @@ func TestClient_BatchWrite_Error(t *testing.T) {
61026102
)
61036103
mutationGroups := []*MutationGroup{
61046104
{[]*Mutation{
6105-
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}, nil},
6105+
{op: opInsertOrUpdate, table: "t_test", columns: []string{"key", "val"}, values: []interface{}{"foo1", 1}},
61066106
}},
61076107
}
61086108
iter := client.BatchWrite(context.Background(), mutationGroups)
@@ -6177,7 +6177,7 @@ func TestClient_BatchWrite_Options(t *testing.T) {
61776177

61786178
mutationGroups := []*MutationGroup{
61796179
{[]*Mutation{
6180-
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}, nil},
6180+
{op: opInsertOrUpdate, table: "t_test", columns: []string{"key", "val"}, values: []interface{}{"foo1", 1}},
61816181
}},
61826182
}
61836183
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, tt.write)
@@ -6223,7 +6223,7 @@ func checkBatchWriteSpan(t *testing.T, errors []error, code codes.Code) {
62236223
)
62246224
mutationGroups := []*MutationGroup{
62256225
{[]*Mutation{
6226-
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}, nil},
6226+
{op: opInsertOrUpdate, table: "t_test", columns: []string{"key", "val"}, values: []interface{}{"foo1", 1}},
62276227
}},
62286228
}
62296229
iter := client.BatchWrite(context.Background(), mutationGroups)
@@ -6781,7 +6781,7 @@ func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
67816781

67826782
mutationGroups := []*MutationGroup{
67836783
{[]*Mutation{
6784-
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}, nil},
6784+
{op: opInsertOrUpdate, table: "t_test", columns: []string{"key", "val"}, values: []interface{}{"foo1", 1}},
67856785
}},
67866786
}
67876787
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true})

spanner/integration_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const (
7373
fkdcDDLStatements = "FKDC_DDL_STATEMENTS"
7474
intervalDDLStatements = "INTERVAL_DDL_STATEMENTS"
7575
testTableBitReversedSeqStatements = "TEST_TABLE_BIT_REVERSED_SEQUENCE_STATEMENTS"
76+
queueDDLStatements = "QUEUE_DDL_STATEMENTS"
7677
)
7778

7879
var (
@@ -345,6 +346,19 @@ var (
345346
interval_array_len bigint GENERATED ALWAYS AS (ARRAY_LENGTH(ARRAY[INTERVAL '1-2 3 4:5:6'], 1)) STORED
346347
)`,
347348
}
349+
queueDBStatements = []string{
350+
`CREATE QUEUE TestQueue (
351+
Id INT64 NOT NULL,
352+
Payload BYTES(MAX) NOT NULL,
353+
) PRIMARY KEY (Id),
354+
OPTIONS(receive_mode="PULL")`,
355+
}
356+
queueDBPGStatements = []string{
357+
`CREATE QUEUE TestQueue (
358+
Id BIGINT PRIMARY KEY,
359+
"Payload" BYTEA NOT NULL
360+
) WITH (receive_mode = 'PULL')`,
361+
}
348362

349363
statements = map[adminpb.DatabaseDialect]map[string][]string{
350364
adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL: {
@@ -356,6 +370,7 @@ var (
356370
fkdcDDLStatements: fkdcDBStatements,
357371
testTableBitReversedSeqStatements: bitReverseSeqDBStatments,
358372
intervalDDLStatements: intervalDBStatements,
373+
queueDDLStatements: queueDBStatements,
359374
},
360375
adminpb.DatabaseDialect_POSTGRESQL: {
361376
singerDDLStatements: singerDBPGStatements,
@@ -366,6 +381,7 @@ var (
366381
fkdcDDLStatements: fkdcDBPGStatements,
367382
testTableBitReversedSeqStatements: bitReverseSeqDBPGStatments,
368383
intervalDDLStatements: intervalDBPGStatements,
384+
queueDDLStatements: queueDBPGStatements,
369385
},
370386
}
371387

@@ -503,6 +519,8 @@ func initIntegrationTests() (cleanup func()) {
503519
Config: configName,
504520
DisplayName: testInstanceID,
505521
NodeCount: 1,
522+
// Set Edition to ENTERPRISE to enable Queue support.
523+
Edition: instancepb.Instance_ENTERPRISE_PLUS,
506524
},
507525
})
508526
if err != nil {
@@ -6021,6 +6039,70 @@ func TestIntegration_WithDirectedReadOptions_ReadWriteTransaction_ShouldThrowErr
60216039
}
60226040
}
60236041

6042+
func TestIntegration_QueueMutations(t *testing.T) {
6043+
// Run in cloud-devel only since this queue feature is not fully enabled yet.
6044+
onlyRunOnCloudDevel(t)
6045+
// DDL not fully enabled for PG yet.
6046+
skipUnsupportedPGTest(t)
6047+
// Queue not supported in emulator yet.
6048+
skipEmulatorTest(t)
6049+
t.Parallel()
6050+
6051+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
6052+
defer cancel()
6053+
6054+
// Set up testing environment with the queue DDL.
6055+
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][queueDDLStatements])
6056+
defer cleanup()
6057+
6058+
// 1. Test Send Mutation
6059+
payload := []byte("Hello Queue")
6060+
_, err := client.Apply(ctx, []*Mutation{
6061+
Send("TestQueue", Key{1}, payload, WithDeliveryTime(time.Now())),
6062+
})
6063+
if err != nil {
6064+
t.Fatalf("Failed to apply Send mutation: %v", err)
6065+
}
6066+
// Verify the message exists in the queue table.
6067+
// Spanner Queues can be queried like normal tables.
6068+
stmt := Statement{SQL: "SELECT Id, Payload FROM TestQueue WHERE Id = 1"}
6069+
row, err := client.Single().Query(ctx, stmt).Next()
6070+
if err != nil {
6071+
t.Fatalf("Failed to read sent message: %v", err)
6072+
}
6073+
var val []byte
6074+
err = row.ColumnByName("Payload", &val)
6075+
if err != nil {
6076+
t.Fatalf("Failed to read message payload bytes: %v", err)
6077+
}
6078+
if !testEqual(val, payload) {
6079+
t.Errorf("Send verification failed. Got payload: %v, want payload: %v", val, payload)
6080+
}
6081+
6082+
// 2. Test Ack Mutation
6083+
if _, err := client.Apply(ctx, []*Mutation{Ack("TestQueue", Key{1})}); err != nil {
6084+
t.Fatalf("Failed to apply Ack mutation: %v", err)
6085+
}
6086+
// Verify message is deleted.
6087+
row, err = client.Single().Query(ctx, stmt).Next()
6088+
if err == nil || err != iterator.Done {
6089+
t.Error("Ack failed: message still exists in queue")
6090+
}
6091+
6092+
// 3. Test Ack with IgnoreNotFound.
6093+
// Acknowledge a non-existent key (Id=99) with IgnoreNotFound.
6094+
if _, err := client.Apply(ctx, []*Mutation{Ack("TestQueue", Key{99}, WithIgnoreNotFound(true))}); err != nil {
6095+
t.Errorf("Ack with IgnoreNotFound should succeed for missing keys, but got: %v", err)
6096+
}
6097+
6098+
// 4. Test Ack without IgnoreNotFound (Should Fail).
6099+
// The spec says Ack returns an error if the message doesn't exist.
6100+
_, err = client.Apply(ctx, []*Mutation{Ack("TestQueue", Key{100})})
6101+
if err == nil {
6102+
t.Error("Ack without IgnoreNotFound should have failed for non-existent message")
6103+
}
6104+
}
6105+
60246106
// Prepare initializes Cloud Spanner testing DB and clients.
60256107
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
60266108
return prepareDBAndClient(ctx, t, spc, statements, testDialect)

spanner/mutation.go

Lines changed: 128 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ import (
2020
"fmt"
2121
"math/rand"
2222
"reflect"
23+
"time"
2324

2425
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
2526
"google.golang.org/grpc/codes"
2627
proto3 "google.golang.org/protobuf/types/known/structpb"
28+
"google.golang.org/protobuf/types/known/timestamppb"
2729
)
2830

2931
// op is the mutation operation.
@@ -47,10 +49,19 @@ const (
4749
// opUpdate updates a row in a table. If the row does not already
4850
// exist, the write or transaction fails.
4951
opUpdate
52+
// opSend sends a message to a queue. Users need to specify the queue
53+
// name, a key, payload, and optionally delivery time and sending
54+
// a message.
55+
opSend
56+
// opAck acks a message in a queue, effectively remove it from the
57+
// queue. Users need to specify the queue name and the key, and optionally
58+
// a bool value to ignore error if the message does not exist.
59+
opAck
5060
)
5161

5262
// A Mutation describes a modification to one or more Cloud Spanner rows. The
53-
// mutation represents an insert, update, delete, etc on a table.
63+
// mutation represents an insert, update, delete, etc on a table, or send, ack
64+
// on a queue.
5465
//
5566
// Many mutations can be applied in a single atomic commit. For purposes of
5667
// constraint checking (such as foreign key constraints), the operations can be
@@ -142,6 +153,18 @@ type Mutation struct {
142153
// named by Columns.
143154
values []interface{}
144155

156+
// Queue related fields
157+
// Target queue name to be modified
158+
queue string
159+
// key is the primary key used in send or ack
160+
key Key
161+
// payload is the content of the message
162+
payload interface{}
163+
// deliverTime is optionally set for opSend
164+
deliverTime time.Time
165+
// ignoreNotFound is optionally set for opAck
166+
ignoreNotFound bool
167+
145168
// wrapped is the protobuf mutation that is the source for this Mutation.
146169
// This is only set if the [WrapMutation] function was used to create the Mutation.
147170
wrapped *sppb.Mutation
@@ -365,35 +388,88 @@ func Delete(table string, ks KeySet) *Mutation {
365388
}
366389
}
367390

391+
// SendOption specifies optional fields for Send mutation
392+
type SendOption func(*Mutation)
393+
394+
// AckOption specifies optional fields for Ack mutation
395+
type AckOption func(*Mutation)
396+
397+
// WithDeliveryTime returns an SendOption to set field `deliverTime`
398+
func WithDeliveryTime(t time.Time) SendOption {
399+
return func(m *Mutation) {
400+
m.deliverTime = t
401+
}
402+
}
403+
404+
// Send returns a Mutation to send a message to a queue.
405+
func Send(queue string, key Key, payload interface{}, opts ...SendOption) *Mutation {
406+
m := &Mutation{
407+
op: opSend,
408+
queue: queue,
409+
key: key,
410+
payload: payload,
411+
}
412+
for _, opt := range opts {
413+
opt(m)
414+
}
415+
return m
416+
}
417+
418+
// WithIgnoreNotFound returns an AckOption to set field `ignoreNotFound`
419+
func WithIgnoreNotFound(ignoreNotFound bool) AckOption {
420+
return func(m *Mutation) {
421+
m.ignoreNotFound = ignoreNotFound
422+
}
423+
}
424+
425+
// Ack returns a Mutation to acknowledge (and thus delete) a message from a queue.
426+
func Ack(queue string, key Key, opts ...AckOption) *Mutation {
427+
m := &Mutation{
428+
op: opAck,
429+
queue: queue,
430+
key: key,
431+
}
432+
for _, opt := range opts {
433+
opt(m)
434+
}
435+
return m
436+
}
437+
368438
// WrapMutation creates a mutation that wraps an existing protobuf mutation object.
369439
func WrapMutation(proto *sppb.Mutation) (*Mutation, error) {
370440
if proto == nil {
371441
return nil, fmt.Errorf("protobuf mutation may not be nil")
372442
}
373-
op, table, err := getTableAndSpannerOperation(proto)
443+
op, table, queue, err := getTableOrQueueAndSpannerOperation(proto)
374444
if err != nil {
375445
return nil, err
376446
}
377447
return &Mutation{
378448
op: op,
379449
table: table,
450+
queue: queue,
380451
wrapped: proto,
381452
}, nil
382453
}
383454

384-
func getTableAndSpannerOperation(proto *sppb.Mutation) (op, string, error) {
385-
if proto.GetInsert() != nil {
386-
return opInsert, proto.GetInsert().Table, nil
387-
} else if proto.GetUpdate() != nil {
388-
return opUpdate, proto.GetUpdate().Table, nil
389-
} else if proto.GetReplace() != nil {
390-
return opReplace, proto.GetReplace().Table, nil
391-
} else if proto.GetDelete() != nil {
392-
return opDelete, proto.GetDelete().Table, nil
393-
} else if proto.GetInsertOrUpdate() != nil {
394-
return opInsertOrUpdate, proto.GetInsertOrUpdate().Table, nil
395-
}
396-
return 0, "", spannerErrorf(codes.InvalidArgument, "unknown op type: %d", proto.Operation)
455+
func getTableOrQueueAndSpannerOperation(proto *sppb.Mutation) (op, string, string, error) {
456+
switch op := proto.Operation.(type) {
457+
case *sppb.Mutation_Insert:
458+
return opInsert, op.Insert.Table, "", nil
459+
case *sppb.Mutation_Update:
460+
return opUpdate, op.Update.Table, "", nil
461+
case *sppb.Mutation_Replace:
462+
return opReplace, op.Replace.Table, "", nil
463+
case *sppb.Mutation_Delete_:
464+
return opDelete, op.Delete.Table, "", nil
465+
case *sppb.Mutation_InsertOrUpdate:
466+
return opInsertOrUpdate, op.InsertOrUpdate.Table, "", nil
467+
case *sppb.Mutation_Send_:
468+
return opSend, "", op.Send.Queue, nil
469+
case *sppb.Mutation_Ack_:
470+
return opAck, "", op.Ack.Queue, nil
471+
}
472+
return 0, "", "", spannerErrorf(codes.InvalidArgument, "unknown op type: %T", proto.Operation)
397473
}
398474

399475
// prepareWrite generates sppb.Mutation_Write from table name, column names
@@ -465,6 +541,43 @@ func (m Mutation) proto() (*sppb.Mutation, error) {
465541
return nil, err
466542
}
467543
pb = &sppb.Mutation{Operation: &sppb.Mutation_Update{Update: w}}
544+
case opSend:
545+
k, err := encodeValueArray([]interface{}(m.key))
546+
if err != nil {
547+
return nil, err
548+
}
549+
p, _, err := encodeValue(m.payload)
550+
if err != nil {
551+
return nil, err
552+
}
553+
var dt *timestamppb.Timestamp
554+
if !m.deliverTime.IsZero() {
555+
dt = timestamppb.New(m.deliverTime)
556+
}
557+
pb = &sppb.Mutation{
558+
Operation: &sppb.Mutation_Send_{
559+
Send: &sppb.Mutation_Send{
560+
Queue: m.queue,
561+
Key: k,
562+
Payload: p,
563+
DeliverTime: dt,
564+
},
565+
},
566+
}
567+
case opAck:
568+
k, err := encodeValueArray([]interface{}(m.key))
569+
if err != nil {
570+
return nil, err
571+
}
572+
pb = &sppb.Mutation{
573+
Operation: &sppb.Mutation_Ack_{
574+
Ack: &sppb.Mutation_Ack{
575+
Queue: m.queue,
576+
Key: k,
577+
IgnoreNotFound: m.ignoreNotFound,
578+
},
579+
},
580+
}
468581
default:
469582
return nil, errInvdMutationOp(m)
470583
}

0 commit comments

Comments
 (0)