Skip to content

Commit ed7be6b

Browse files
Add WithCustomAttributes for PubSub
Enables to pre-fill PubSub Message Attributes independent of CloudEvent spec. Thus this allows for setting arbitrary Message Attributes (given that they are not later overwritten by CloudEvent Attributes). Signed-off-by: Andreas Bergmeier <andreas.bergmeier@otto.de>
1 parent 0836a52 commit ed7be6b

File tree

5 files changed

+100
-2
lines changed

5 files changed

+100
-2
lines changed

protocol/pubsub/v2/attributes.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
Copyright 2021 The CloudEvents Authors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package pubsub
7+
8+
import (
9+
"context"
10+
11+
"github.com/cloudevents/sdk-go/v2/binding"
12+
)
13+
14+
type withCustomAttributes struct{}
15+
16+
func AttributesFrom(ctx context.Context) map[string]string {
17+
return binding.GetOrDefaultFromCtx(ctx, withCustomAttributes{}, make(map[string]string)).(map[string]string)
18+
}
19+
20+
// WithCustomAttributes sets Message Attributes without any CloudEvent logic.
21+
// Note that this function is not intended for CloudEvent Extensions or any `ce-`-prefixed Attributes.
22+
// For these please see `Event` and `Event.SetExtension`.
23+
func WithCustomAttributes(ctx context.Context, attrs map[string]string) context.Context {
24+
return context.WithValue(ctx, withCustomAttributes{}, attrs)
25+
}

protocol/pubsub/v2/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@
55

66
/*
77
Package pubsub implements a Pub/Sub binding using google.cloud.com/go/pubsub module
8+
9+
PubSub Messages can be modified beyond what CloudEvents cover by using `WithOrderingKey`
10+
or `WithCustomAttributes`. See function docs for more details.
811
*/
912
package pubsub

protocol/pubsub/v2/protocol.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ func (t *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
110110

111111
conn := t.getOrCreateConnection(ctx, topic, "", "")
112112

113-
msg := &pubsub.Message{}
113+
msg := &pubsub.Message{
114+
Attributes: AttributesFrom(ctx),
115+
}
114116

115117
if key, ok := ctx.Value(withOrderingKey{}).(string); ok {
116118
if !t.MessageOrdering {

protocol/pubsub/v2/protocol_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package pubsub
33
import (
44
"context"
55
"fmt"
6+
"reflect"
67
"testing"
78

89
"cloud.google.com/go/pubsub"
10+
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
911
"cloud.google.com/go/pubsub/pstest"
1012
"github.com/stretchr/testify/require"
1113
"google.golang.org/api/option"
1214
pb "google.golang.org/genproto/googleapis/pubsub/v1"
1315
"google.golang.org/grpc"
16+
"google.golang.org/grpc/credentials/insecure"
1417

1518
"github.com/cloudevents/sdk-go/v2/test"
1619
)
@@ -20,6 +23,32 @@ type testPubsubClient struct {
2023
conn *grpc.ClientConn
2124
}
2225

26+
func (pc *testPubsubClient) NewWithAttributesInterceptor(ctx context.Context, projectID, orderingKey string) (*pubsub.Client, error) {
27+
pc.srv = pstest.NewServer()
28+
conn, err := grpc.Dial(pc.srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(customAttributesInterceptor(map[string]string{
29+
"Content-Type": "text/json",
30+
"ce-dataschema": "http://example.com/schema",
31+
"ce-exbinary": "AAECAw==",
32+
"ce-exbool": "true",
33+
"ce-exint": "42",
34+
"ce-exstring": "exstring",
35+
"ce-extime": "2020-03-21T12:34:56.78Z",
36+
"ce-exurl": "http://example.com/source",
37+
"ce-id": "full-event",
38+
"ce-source": "http://example.com/source",
39+
"ce-specversion": "1.0",
40+
"ce-subject": "topic",
41+
"ce-time": "2020-03-21T12:34:56.78Z",
42+
"ce-type": "com.example.FullEvent",
43+
"Proxy-Authorization": "YWxhZGRpbjpvcGVuc2VzYW1l",
44+
})))
45+
if err != nil {
46+
return nil, err
47+
}
48+
pc.conn = conn
49+
return pubsub.NewClient(ctx, projectID, option.WithGRPCConn(conn))
50+
}
51+
2352
func (pc *testPubsubClient) NewWithOrderInterceptor(ctx context.Context, projectID, orderingKey string) (*pubsub.Client, error) {
2453
pc.srv = pstest.NewServer()
2554
conn, err := grpc.Dial(pc.srv.Addr, grpc.WithInsecure(), grpc.WithUnaryInterceptor(orderingKeyInterceptor(orderingKey)))
@@ -35,6 +64,20 @@ func (pc *testPubsubClient) Close() {
3564
pc.conn.Close()
3665
}
3766

67+
func customAttributesInterceptor(attrs map[string]string) grpc.UnaryClientInterceptor {
68+
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
69+
if method == "/google.pubsub.v1.Publisher/Publish" {
70+
pr, _ := req.(*pubsubpb.PublishRequest)
71+
for _, m := range pr.Messages {
72+
if !reflect.DeepEqual(m.Attributes, attrs) {
73+
return fmt.Errorf("expecting Attributes %q, got %q", attrs, m.Attributes)
74+
}
75+
}
76+
}
77+
return invoker(ctx, method, req, reply, cc, opts...)
78+
}
79+
}
80+
3881
func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor {
3982
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
4083
if method == "/google.pubsub.v1.Publisher/Publish" {
@@ -49,6 +92,32 @@ func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor {
4992
}
5093
}
5194

95+
func TestPublishMessageHasCustomAttributes(t *testing.T) {
96+
require := require.New(t)
97+
ctx := context.Background()
98+
pc := &testPubsubClient{}
99+
defer pc.Close()
100+
101+
projectID, topicID, orderingKey := "test-project", "test-topic", "foobar"
102+
103+
client, err := pc.NewWithAttributesInterceptor(ctx, projectID, orderingKey)
104+
require.NoError(err, "create pubsub client")
105+
defer client.Close()
106+
107+
prot, err := New(ctx,
108+
WithClient(client),
109+
WithProjectID(projectID),
110+
WithTopicID(topicID),
111+
AllowCreateTopic(true),
112+
)
113+
require.NoError(err, "create protocol")
114+
115+
err = prot.Send(WithCustomAttributes(ctx, map[string]string{
116+
"Proxy-Authorization": "YWxhZGRpbjpvcGVuc2VzYW1l",
117+
}), test.FullMessage())
118+
require.NoError(err)
119+
}
120+
52121
func TestPublishMessageHasOrderingKey(t *testing.T) {
53122
require := require.New(t)
54123
ctx := context.Background()

protocol/pubsub/v2/write_pubsub_message.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ func (b *pubsubMessagePublisher) SetStructuredEvent(ctx context.Context, f forma
4646
}
4747

4848
func (b *pubsubMessagePublisher) Start(ctx context.Context) error {
49-
b.Attributes = make(map[string]string)
5049
return nil
5150
}
5251

0 commit comments

Comments
 (0)