Skip to content

pubsub: make pubsub.Message testable with AckWithResult #8480

@Keylor42

Description

@Keylor42

Is your feature request related to a problem? Please describe.
In order to transition to exactly-once delivery, I updated the wrapper used in my company's code base to use AckWithResult methods of the pubsub.Message. We're using go 1.21 and cloud.google.com/go/pubsub v1.33.0. Simplified version of the wrapper is:

package main

import (
	"context"
	"fmt"

	"cloud.google.com/go/pubsub"
)

// Receiver wraps a pubsub.Receiver.
type Receiver struct {
	Subscription *pubsub.Subscription
}

// NewReceiver creates a new Receiver.
func NewReceiver(ctx context.Context, project, subscription string) (*Receiver, error) {
	c, err := pubsub.NewClient(ctx, project)
	if err != nil {
		return nil, err
	}
	s := c.Subscription(subscription)
	ok, err := s.Exists(ctx)
	if err != nil {
		return nil, err
	}
	if !ok {
		return nil, fmt.Errorf("subscription %q does not exist", subscription)
	}

	return &Receiver{
		Subscription: s,
	}, nil
}

// Receive receives pubsub messages.
func (r *Receiver) Receive(ctx context.Context, f func(context.Context, *pubsub.Message) error) error {
	return r.Subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
		r.Process(ctx, m, f)
	})
}

// Process wraps the processing of a single pubsub message through the provided callback.
// Process automatically Ack or Nack the message based on the value returned by the callback.
func (r *Receiver) Process(ctx context.Context, m *pubsub.Message, f func(context.Context, *pubsub.Message) error) {
	err := f(ctx, m)
	if err != nil {
		res := m.NackWithResult()
		status, err := res.Get(ctx)
		fmt.Println(status, err)
		return
	}

	res := m.AckWithResult()
	status, err := res.Get(ctx)
	fmt.Println(status, err)
}

So we replaced the Ack and Nack by AckWithResult and NackWithResult, that are also compatible with non-exactly once delivery subscriptions which is great for our use case.

Now let's say I have this test for the wrapper:

func TestProcess(t *testing.T) {
	r := &Receiver{}
	msg := &pubsub.Message{
		Data: []byte("foo"),
	}

	ctx := context.Background()
	r.Process(ctx, msg, func(ctx context.Context, m *pubsub.Message) error {
		fmt.Println(string(msg.Data))
		return nil
	})
}

Before, when using Ack or Nack the test would pass. Now if I run this test I have an error:

go test
foo
--- FAIL: TestProcess (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5c98c1]

goroutine 7 [running]:
testing.tRunner.func1.2({0x966ee0, 0xf28840})
	/usr/local/go/src/testing/testing.go:1545 +0x238
testing.tRunner.func1()
	/usr/local/go/src/testing/testing.go:1548 +0x397
panic({0x966ee0?, 0xf28840?})
	/usr/local/go/src/runtime/panic.go:914 +0x21f
cloud.google.com/go/internal/pubsub.(*AckResult).Ready(...)
	/home/go/pkg/mod/cloud.google.com/go@v0.110.7/internal/pubsub/message.go:122
cloud.google.com/go/internal/pubsub.(*AckResult).Get(0x0, {0xb11ce8, 0xf721a0})
	/home/go/pkg/mod/cloud.google.com/go@v0.110.7/internal/pubsub/message.go:129 +0x21
pubsubmsgtest.(*Receiver).Process(0xdd2950?, {0xb11ce8, 0xf721a0}, 0x52691c?, 0x526f6e?)
	/home/go/src/pubsubmsgtest/main.go:54 +0x10d
pubsubmsgtest.TestProcess(0x0?)
	/home/go/src/pubsubmsgtest/main_test.go:18 +0xb2
testing.tRunner(0xc000494680, 0xa80450)
	/usr/local/go/src/testing/testing.go:1595 +0xff
created by testing.(*T).Run in goroutine 1
	/usr/local/go/src/testing/testing.go:1648 +0x3ad
exit status 2
FAIL	pubsubmsgtest	0.008s

My understanding is that the test panics because we're calling the Ready method of a nil ackHandler since it has not been initialized in the Message. Since the ackHandler field is unexported in the pubsub.Message struct, I cannot access it or mock it to make the test pass.

Describe the solution you'd like
I'd like a way to initialize a pubsub.Message with an implementation of an ackHandler that I could use in tests. What's frustrating is that there is a function like that in https://github.com/googleapis/google-cloud-go/blob/main/internal/pubsub/message.go#L214 but since it's an internal package, I cannot import it and use it. Could it be moved to the not-internal pubusub package ? Or is there an existing solution I did not see ?

Describe alternatives you've considered
We could update our tests to use the pubsub emulator, but just wondering if there would be a solution that would allow to keep our tests simple without any use of the emulator.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions