-
Notifications
You must be signed in to change notification settings - Fork 1.5k
pubsub: make pubsub.Message testable with AckWithResult #8480
Description
Is your feature request related to a problem? Please describe.
In order to transition to exactly-once delivery, I updated the wrapper used in my company's code base to use AckWithResult methods of the pubsub.Message. We're using go 1.21 and cloud.google.com/go/pubsub v1.33.0. Simplified version of the wrapper is:
package main
import (
"context"
"fmt"
"cloud.google.com/go/pubsub"
)
// Receiver wraps a pubsub.Receiver.
type Receiver struct {
Subscription *pubsub.Subscription
}
// NewReceiver creates a new Receiver.
func NewReceiver(ctx context.Context, project, subscription string) (*Receiver, error) {
c, err := pubsub.NewClient(ctx, project)
if err != nil {
return nil, err
}
s := c.Subscription(subscription)
ok, err := s.Exists(ctx)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("subscription %q does not exist", subscription)
}
return &Receiver{
Subscription: s,
}, nil
}
// Receive receives pubsub messages.
func (r *Receiver) Receive(ctx context.Context, f func(context.Context, *pubsub.Message) error) error {
return r.Subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
r.Process(ctx, m, f)
})
}
// Process wraps the processing of a single pubsub message through the provided callback.
// Process automatically Ack or Nack the message based on the value returned by the callback.
func (r *Receiver) Process(ctx context.Context, m *pubsub.Message, f func(context.Context, *pubsub.Message) error) {
err := f(ctx, m)
if err != nil {
res := m.NackWithResult()
status, err := res.Get(ctx)
fmt.Println(status, err)
return
}
res := m.AckWithResult()
status, err := res.Get(ctx)
fmt.Println(status, err)
}So we replaced the Ack and Nack by AckWithResult and NackWithResult, that are also compatible with non-exactly once delivery subscriptions which is great for our use case.
Now let's say I have this test for the wrapper:
func TestProcess(t *testing.T) {
r := &Receiver{}
msg := &pubsub.Message{
Data: []byte("foo"),
}
ctx := context.Background()
r.Process(ctx, msg, func(ctx context.Context, m *pubsub.Message) error {
fmt.Println(string(msg.Data))
return nil
})
}Before, when using Ack or Nack the test would pass. Now if I run this test I have an error:
go test
foo
--- FAIL: TestProcess (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5c98c1]
goroutine 7 [running]:
testing.tRunner.func1.2({0x966ee0, 0xf28840})
/usr/local/go/src/testing/testing.go:1545 +0x238
testing.tRunner.func1()
/usr/local/go/src/testing/testing.go:1548 +0x397
panic({0x966ee0?, 0xf28840?})
/usr/local/go/src/runtime/panic.go:914 +0x21f
cloud.google.com/go/internal/pubsub.(*AckResult).Ready(...)
/home/go/pkg/mod/cloud.google.com/go@v0.110.7/internal/pubsub/message.go:122
cloud.google.com/go/internal/pubsub.(*AckResult).Get(0x0, {0xb11ce8, 0xf721a0})
/home/go/pkg/mod/cloud.google.com/go@v0.110.7/internal/pubsub/message.go:129 +0x21
pubsubmsgtest.(*Receiver).Process(0xdd2950?, {0xb11ce8, 0xf721a0}, 0x52691c?, 0x526f6e?)
/home/go/src/pubsubmsgtest/main.go:54 +0x10d
pubsubmsgtest.TestProcess(0x0?)
/home/go/src/pubsubmsgtest/main_test.go:18 +0xb2
testing.tRunner(0xc000494680, 0xa80450)
/usr/local/go/src/testing/testing.go:1595 +0xff
created by testing.(*T).Run in goroutine 1
/usr/local/go/src/testing/testing.go:1648 +0x3ad
exit status 2
FAIL pubsubmsgtest 0.008s
My understanding is that the test panics because we're calling the Ready method of a nil ackHandler since it has not been initialized in the Message. Since the ackHandler field is unexported in the pubsub.Message struct, I cannot access it or mock it to make the test pass.
Describe the solution you'd like
I'd like a way to initialize a pubsub.Message with an implementation of an ackHandler that I could use in tests. What's frustrating is that there is a function like that in https://github.com/googleapis/google-cloud-go/blob/main/internal/pubsub/message.go#L214 but since it's an internal package, I cannot import it and use it. Could it be moved to the not-internal pubusub package ? Or is there an existing solution I did not see ?
Describe alternatives you've considered
We could update our tests to use the pubsub emulator, but just wondering if there would be a solution that would allow to keep our tests simple without any use of the emulator.