Skip to content

Commit 808bf38

Browse files
committed
provide the qos and retain configuration for mqtt protocol
Signed-off-by: myan <myan@redhat.com> to review Signed-off-by: myan <myan@redhat.com> avoid all-or-nothing Signed-off-by: myan <myan@redhat.com> rollback all-or-nothing Signed-off-by: myan <myan@redhat.com> convert tparamter to pointers Signed-off-by: myan <myan@redhat.com> fixed doc string Signed-off-by: myan <myan@redhat.com> remove the protocolContext Signed-off-by: myan <myan@redhat.com> add context for protocol Signed-off-by: myan <myan@redhat.com> initialize mqtt protocol with publish/subscribe options Signed-off-by: myan <myan@redhat.com> remove the sender options from context Signed-off-by: myan <myan@redhat.com> using functional options in new() Signed-off-by: myan <myan@redhat.com> add ctx to the new() function Signed-off-by: myan <myan@redhat.com>
1 parent 3dfc033 commit 808bf38

File tree

6 files changed

+157
-84
lines changed

6 files changed

+157
-84
lines changed

protocol/mqtt_paho/v2/option.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright 2023 The CloudEvents Authors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package mqtt_paho
7+
8+
import (
9+
"fmt"
10+
11+
"github.com/eclipse/paho.golang/paho"
12+
)
13+
14+
// Option is the function signature required to be considered an mqtt_paho.Option.
15+
type Option func(*Protocol) error
16+
17+
// WithConnect sets the paho.Connect configuration for the client. This option is not required.
18+
func WithConnect(connOpt *paho.Connect) Option {
19+
return func(p *Protocol) error {
20+
if connOpt == nil {
21+
return fmt.Errorf("the paho.Connect option must not be nil")
22+
}
23+
p.connOption = connOpt
24+
return nil
25+
}
26+
}
27+
28+
// WithPublish sets the paho.Publish configuration for the client. This option is required if you want to send messages.
29+
func WithPublish(publishOpt *paho.Publish) Option {
30+
return func(p *Protocol) error {
31+
if publishOpt == nil {
32+
return fmt.Errorf("the paho.Publish option must not be nil")
33+
}
34+
p.publishOption = publishOpt
35+
return nil
36+
}
37+
}
38+
39+
// WithSubscribe sets the paho.Subscribe configuration for the client. This option is required if you want to receive messages.
40+
func WithSubscribe(subscribeOpt *paho.Subscribe) Option {
41+
return func(p *Protocol) error {
42+
if subscribeOpt == nil {
43+
return fmt.Errorf("the paho.Subscribe option must not be nil")
44+
}
45+
p.subscribeOption = subscribeOpt
46+
return nil
47+
}
48+
}

protocol/mqtt_paho/v2/protocol.go

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ import (
1919
)
2020

2121
type Protocol struct {
22-
client *paho.Client
23-
connConfig *paho.Connect
24-
senderTopic string
25-
receiverTopics []string
26-
qos byte
27-
retained bool
22+
client *paho.Client
23+
config *paho.ClientConfig
24+
connOption *paho.Connect
25+
publishOption *paho.Publish
26+
subscribeOption *paho.Subscribe
2827

2928
// receiver
3029
incoming chan *paho.Publish
@@ -41,96 +40,105 @@ var (
4140
_ protocol.Closer = (*Protocol)(nil)
4241
)
4342

44-
func New(ctx context.Context, clientConfig *paho.ClientConfig, connConfig *paho.Connect, SenderTopic string,
45-
ReceiverTopics []string, qos byte, retained bool,
46-
) (*Protocol, error) {
47-
client := paho.NewClient(*clientConfig)
48-
ca, err := client.Connect(ctx, connConfig)
43+
func New(ctx context.Context, config *paho.ClientConfig, opts ...Option) (*Protocol, error) {
44+
if config == nil {
45+
return nil, fmt.Errorf("the paho.ClientConfig must not be nil")
46+
}
47+
48+
p := &Protocol{
49+
client: paho.NewClient(*config),
50+
// default connect option
51+
connOption: &paho.Connect{
52+
KeepAlive: 30,
53+
CleanStart: true,
54+
},
55+
incoming: make(chan *paho.Publish),
56+
closeChan: make(chan struct{}),
57+
}
58+
if err := p.applyOptions(opts...); err != nil {
59+
return nil, err
60+
}
61+
62+
// Connect to the MQTT broker
63+
connAck, err := p.client.Connect(ctx, p.connOption)
4964
if err != nil {
5065
return nil, err
5166
}
52-
if ca.ReasonCode != 0 {
53-
return nil, fmt.Errorf("failed to connect to %s : %d - %s", client.Conn.RemoteAddr(), ca.ReasonCode,
54-
ca.Properties.ReasonString)
67+
if connAck.ReasonCode != 0 {
68+
return nil, fmt.Errorf("failed to connect to %q : %d - %q", p.client.Conn.RemoteAddr(), connAck.ReasonCode,
69+
connAck.Properties.ReasonString)
5570
}
5671

57-
return &Protocol{
58-
client: client,
59-
connConfig: connConfig,
60-
senderTopic: SenderTopic,
61-
receiverTopics: ReceiverTopics,
62-
qos: qos,
63-
retained: retained,
64-
incoming: make(chan *paho.Publish),
65-
openerMutex: sync.Mutex{},
66-
closeChan: make(chan struct{}),
67-
}, nil
72+
return p, nil
73+
}
74+
75+
func (p *Protocol) applyOptions(opts ...Option) error {
76+
for _, fn := range opts {
77+
if err := fn(p); err != nil {
78+
return err
79+
}
80+
}
81+
return nil
6882
}
6983

70-
func (t *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error {
84+
func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error {
85+
if p.publishOption == nil {
86+
return fmt.Errorf("the paho.Publish option must not be nil")
87+
}
88+
7189
var err error
7290
defer m.Finish(err)
7391

74-
topic := cecontext.TopicFrom(ctx)
75-
if topic == "" {
76-
topic = t.senderTopic
92+
msg := p.publishOption
93+
if cecontext.TopicFrom(ctx) != "" {
94+
msg.Topic = cecontext.TopicFrom(ctx)
95+
cecontext.WithTopic(ctx, "")
7796
}
7897

79-
msg := &paho.Publish{
80-
QoS: t.qos,
81-
Retain: t.retained,
82-
Topic: topic,
83-
}
8498
err = WritePubMessage(ctx, m, msg, transformers...)
8599
if err != nil {
86100
return err
87101
}
88102

89-
_, err = t.client.Publish(ctx, msg)
103+
_, err = p.client.Publish(ctx, msg)
90104
if err != nil {
91105
return err
92106
}
93107
return err
94108
}
95109

96-
func (t *Protocol) OpenInbound(ctx context.Context) error {
97-
t.openerMutex.Lock()
98-
defer t.openerMutex.Unlock()
110+
func (p *Protocol) OpenInbound(ctx context.Context) error {
111+
if p.subscribeOption == nil {
112+
return fmt.Errorf("the paho.Subscribe option must not be nil")
113+
}
114+
115+
p.openerMutex.Lock()
116+
defer p.openerMutex.Unlock()
99117

100118
logger := cecontext.LoggerFrom(ctx)
101119

102-
t.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) {
103-
t.incoming <- m
120+
p.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) {
121+
p.incoming <- m
104122
})
105123

106-
subs := make(map[string]paho.SubscribeOptions)
107-
for _, topic := range t.receiverTopics {
108-
subs[topic] = paho.SubscribeOptions{
109-
QoS: t.qos,
110-
RetainAsPublished: t.retained,
111-
}
112-
}
113-
114-
logger.Infof("subscribe to topics: %v", t.receiverTopics)
115-
_, err := t.client.Subscribe(ctx, &paho.Subscribe{
116-
Subscriptions: subs,
117-
})
124+
logger.Infof("subscribing to topics: %v", p.subscribeOption.Subscriptions)
125+
_, err := p.client.Subscribe(ctx, p.subscribeOption)
118126
if err != nil {
119127
return err
120128
}
121129

122130
// Wait until external or internal context done
123131
select {
124132
case <-ctx.Done():
125-
case <-t.closeChan:
133+
case <-p.closeChan:
126134
}
127-
return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
135+
return p.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
128136
}
129137

130138
// Receive implements Receiver.Receive
131-
func (t *Protocol) Receive(ctx context.Context) (binding.Message, error) {
139+
func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) {
132140
select {
133-
case m, ok := <-t.incoming:
141+
case m, ok := <-p.incoming:
134142
if !ok {
135143
return nil, io.EOF
136144
}

samples/mqtt/receiver/main.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"log"
1212
"net"
1313

14-
cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
14+
mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
1515
cloudevents "github.com/cloudevents/sdk-go/v2"
1616
"github.com/eclipse/paho.golang/paho"
1717
)
@@ -22,16 +22,16 @@ func main() {
2222
if err != nil {
2323
log.Fatalf("failed to connect to mqtt broker: %s", err.Error())
2424
}
25-
clientConfig := &paho.ClientConfig{
25+
config := &paho.ClientConfig{
2626
ClientID: "receiver-client-id",
2727
Conn: conn,
2828
}
29-
cp := &paho.Connect{
30-
KeepAlive: 30,
31-
CleanStart: true,
29+
subscribeOpt := &paho.Subscribe{
30+
Subscriptions: map[string]paho.SubscribeOptions{
31+
"test-topic": {QoS: 0},
32+
},
3233
}
33-
34-
p, err := cemqtt.New(ctx, clientConfig, cp, "", []string{"test-topic"}, 0, false)
34+
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithSubscribe(subscribeOpt))
3535
if err != nil {
3636
log.Fatalf("failed to create protocol: %s", err.Error())
3737
}

samples/mqtt/sender/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/eclipse/paho.golang/paho"
1616
"github.com/google/uuid"
1717

18-
cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
18+
mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
1919
cecontext "github.com/cloudevents/sdk-go/v2/context"
2020
)
2121

@@ -29,18 +29,19 @@ func main() {
2929
if err != nil {
3030
log.Fatalf("failed to connect to mqtt broker: %s", err.Error())
3131
}
32-
clientConfig := &paho.ClientConfig{
32+
config := &paho.ClientConfig{
3333
ClientID: "sender-client-id",
3434
Conn: conn,
3535
}
36-
cp := &paho.Connect{
36+
// optional connect option
37+
connOpt := &paho.Connect{
3738
KeepAlive: 30,
3839
CleanStart: true,
3940
}
4041
// set a default topic with test-topic1
41-
p, err := cemqtt.New(ctx, clientConfig, cp, "test-topic1", nil, 0, false)
42+
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithPublish(&paho.Publish{Topic: "test-topic1"}), mqtt_paho.WithConnect(connOpt))
4243
if err != nil {
43-
log.Fatalf("failed to create protocol: %s", err.Error())
44+
log.Fatalf("failed to create protocol: %v", err)
4445
}
4546
defer p.Close(ctx)
4647

test/integration/mqtt_paho/mqtt_test.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
1919
cloudevents "github.com/cloudevents/sdk-go/v2"
20+
cecontext "github.com/cloudevents/sdk-go/v2/context"
2021
"github.com/cloudevents/sdk-go/v2/event"
2122
"github.com/cloudevents/sdk-go/v2/test"
2223
)
@@ -37,7 +38,7 @@ func TestSendEvent(t *testing.T) {
3738
eventChan := make(chan receiveEvent)
3839
defer close(eventChan)
3940
go func() {
40-
client, err := cloudevents.NewClient(protocolFactory(t, topicName))
41+
client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName))
4142
if err != nil {
4243
eventChan <- receiveEvent{err: err}
4344
return
@@ -51,8 +52,8 @@ func TestSendEvent(t *testing.T) {
5152
}
5253
}()
5354

54-
// start a cloudevents sender client go to send the event
55-
client, err := cloudevents.NewClient(protocolFactory(t, topicName))
55+
// start a cloudevents sender client go to send the event, set the topic on context
56+
client, err := cloudevents.NewClient(protocolFactory(ctx, t, ""))
5657
require.NoError(t, err)
5758

5859
timer := time.NewTimer(5 * time.Millisecond)
@@ -67,7 +68,10 @@ func TestSendEvent(t *testing.T) {
6768
test.AssertEventEquals(t, inEvent, test.ConvertEventExtensionsToString(t, eventOut.event))
6869
return
6970
case <-timer.C:
70-
result := client.Send(ctx, inEvent)
71+
result := client.Send(
72+
cecontext.WithTopic(ctx, topicName),
73+
inEvent,
74+
)
7175
require.NoError(t, result)
7276
// the receiver mightn't be ready before the sender send the message, so wait and we retry
7377
continue
@@ -79,20 +83,27 @@ func TestSendEvent(t *testing.T) {
7983
// To start a local environment for testing:
8084
// docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
8185
// the protocolFactory will generate a unique connection clientId when it be invoked
82-
func protocolFactory(t testing.TB, topicName string) *mqtt_paho.Protocol {
83-
ctx := context.Background()
84-
86+
func protocolFactory(ctx context.Context, t testing.TB, topicName string) *mqtt_paho.Protocol {
8587
broker := "127.0.0.1:1883"
8688
conn, err := net.Dial("tcp", broker)
8789
require.NoError(t, err)
88-
clientConfig := &paho.ClientConfig{
90+
config := &paho.ClientConfig{
8991
Conn: conn,
9092
}
91-
cp := &paho.Connect{
93+
connOpt := &paho.Connect{
9294
KeepAlive: 30,
9395
CleanStart: true,
9496
}
95-
p, err := mqtt_paho.New(ctx, clientConfig, cp, topicName, []string{topicName}, 0, false)
97+
publishOpt := &paho.Publish{
98+
Topic: topicName, QoS: 0,
99+
}
100+
subscribeOpt := &paho.Subscribe{
101+
Subscriptions: map[string]paho.SubscribeOptions{
102+
topicName: {QoS: 0},
103+
},
104+
}
105+
106+
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithConnect(connOpt), mqtt_paho.WithPublish(publishOpt), mqtt_paho.WithSubscribe(subscribeOpt))
96107
require.NoError(t, err)
97108

98109
return p

test/integration/mqtt_paho_binding/mqtt_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,18 @@ func getProtocol(ctx context.Context, topic string) (*mqtt_paho.Protocol, error)
126126
if err != nil {
127127
return nil, err
128128
}
129-
cp := &paho.Connect{
130-
KeepAlive: 30,
131-
CleanStart: true,
129+
config := &paho.ClientConfig{
130+
Conn: conn,
131+
}
132+
publishOpt := &paho.Publish{
133+
Topic: topic, QoS: 0,
134+
}
135+
subscribeOpt := &paho.Subscribe{
136+
Subscriptions: map[string]paho.SubscribeOptions{
137+
topic: {QoS: 0},
138+
},
132139
}
133140

134-
p, err := mqtt_paho.New(ctx, &paho.ClientConfig{
135-
Conn: conn,
136-
}, cp, topic, []string{topic}, 0, false)
141+
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithPublish(publishOpt), mqtt_paho.WithSubscribe(subscribeOpt))
137142
return p, err
138143
}

0 commit comments

Comments
 (0)