@@ -3,14 +3,17 @@ package pubsub
33import (
44 "context"
55 "fmt"
6+ "reflect"
67 "testing"
78
89 "cloud.google.com/go/pubsub"
10+ "cloud.google.com/go/pubsub/apiv1/pubsubpb"
911 "cloud.google.com/go/pubsub/pstest"
1012 "github.com/stretchr/testify/require"
1113 "google.golang.org/api/option"
1214 pb "google.golang.org/genproto/googleapis/pubsub/v1"
1315 "google.golang.org/grpc"
16+ "google.golang.org/grpc/credentials/insecure"
1417
1518 "github.com/cloudevents/sdk-go/v2/test"
1619)
@@ -20,6 +23,32 @@ type testPubsubClient struct {
2023 conn * grpc.ClientConn
2124}
2225
26+ func (pc * testPubsubClient ) NewWithAttributesInterceptor (ctx context.Context , projectID , orderingKey string ) (* pubsub.Client , error ) {
27+ pc .srv = pstest .NewServer ()
28+ conn , err := grpc .Dial (pc .srv .Addr , grpc .WithTransportCredentials (insecure .NewCredentials ()), grpc .WithUnaryInterceptor (customAttributesInterceptor (map [string ]string {
29+ "Content-Type" : "text/json" ,
30+ "ce-dataschema" : "http://example.com/schema" ,
31+ "ce-exbinary" : "AAECAw==" ,
32+ "ce-exbool" : "true" ,
33+ "ce-exint" : "42" ,
34+ "ce-exstring" : "exstring" ,
35+ "ce-extime" : "2020-03-21T12:34:56.78Z" ,
36+ "ce-exurl" : "http://example.com/source" ,
37+ "ce-id" : "full-event" ,
38+ "ce-source" : "http://example.com/source" ,
39+ "ce-specversion" : "1.0" ,
40+ "ce-subject" : "topic" ,
41+ "ce-time" : "2020-03-21T12:34:56.78Z" ,
42+ "ce-type" : "com.example.FullEvent" ,
43+ "Proxy-Authorization" : "YWxhZGRpbjpvcGVuc2VzYW1l" ,
44+ })))
45+ if err != nil {
46+ return nil , err
47+ }
48+ pc .conn = conn
49+ return pubsub .NewClient (ctx , projectID , option .WithGRPCConn (conn ))
50+ }
51+
2352func (pc * testPubsubClient ) NewWithOrderInterceptor (ctx context.Context , projectID , orderingKey string ) (* pubsub.Client , error ) {
2453 pc .srv = pstest .NewServer ()
2554 conn , err := grpc .Dial (pc .srv .Addr , grpc .WithInsecure (), grpc .WithUnaryInterceptor (orderingKeyInterceptor (orderingKey )))
@@ -35,6 +64,20 @@ func (pc *testPubsubClient) Close() {
3564 pc .conn .Close ()
3665}
3766
67+ func customAttributesInterceptor (attrs map [string ]string ) grpc.UnaryClientInterceptor {
68+ return func (ctx context.Context , method string , req , reply interface {}, cc * grpc.ClientConn , invoker grpc.UnaryInvoker , opts ... grpc.CallOption ) error {
69+ if method == "/google.pubsub.v1.Publisher/Publish" {
70+ pr , _ := req .(* pubsubpb.PublishRequest )
71+ for _ , m := range pr .Messages {
72+ if ! reflect .DeepEqual (m .Attributes , attrs ) {
73+ return fmt .Errorf ("expecting Attributes %q, got %q" , attrs , m .Attributes )
74+ }
75+ }
76+ }
77+ return invoker (ctx , method , req , reply , cc , opts ... )
78+ }
79+ }
80+
3881func orderingKeyInterceptor (orderingKey string ) grpc.UnaryClientInterceptor {
3982 return func (ctx context.Context , method string , req , reply interface {}, cc * grpc.ClientConn , invoker grpc.UnaryInvoker , opts ... grpc.CallOption ) error {
4083 if method == "/google.pubsub.v1.Publisher/Publish" {
@@ -49,6 +92,32 @@ func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor {
4992 }
5093}
5194
95+ func TestPublishMessageHasCustomAttributes (t * testing.T ) {
96+ require := require .New (t )
97+ ctx := context .Background ()
98+ pc := & testPubsubClient {}
99+ defer pc .Close ()
100+
101+ projectID , topicID , orderingKey := "test-project" , "test-topic" , "foobar"
102+
103+ client , err := pc .NewWithAttributesInterceptor (ctx , projectID , orderingKey )
104+ require .NoError (err , "create pubsub client" )
105+ defer client .Close ()
106+
107+ prot , err := New (ctx ,
108+ WithClient (client ),
109+ WithProjectID (projectID ),
110+ WithTopicID (topicID ),
111+ AllowCreateTopic (true ),
112+ )
113+ require .NoError (err , "create protocol" )
114+
115+ err = prot .Send (WithCustomAttributes (ctx , map [string ]string {
116+ "Proxy-Authorization" : "YWxhZGRpbjpvcGVuc2VzYW1l" ,
117+ }), test .FullMessage ())
118+ require .NoError (err )
119+ }
120+
52121func TestPublishMessageHasOrderingKey (t * testing.T ) {
53122 require := require .New (t )
54123 ctx := context .Background ()
0 commit comments