Skip to content

Commit 3f19cc1

Browse files
committed
STOMP: handle declarations differently
RMQ 4.3+ doesn't allow transient non-exclusive queues by default so we had tests failing. I've settled on this: --queues arguments is honours, but declarations happen through STOMP headers, rather than AMQP 1.0 management if --queues=predeclared, --queue-durability=none and consumption from /topic - declare exclusive queues. For other /amq/queue/ destinations (which expect pre-declared queues), declare them over AMQP 1.0 management.
1 parent 3061268 commit 3f19cc1

2 files changed

Lines changed: 27 additions & 4 deletions

File tree

pkg/mgmt/mgmt.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ func (m *Mgmt) DeclareQueues(cfg config.Config) {
103103
log.Info("Not declaring queues for AMQP consumers since the address doesn't start with /queues/")
104104
}
105105
}
106-
// declare queues for STOMP publishers
106+
// declare queues for STOMP publishers using /amq/queue/ destination
107107
if cfg.PublisherProto == config.STOMP && strings.HasPrefix(cfg.PublishTo, "/amq/queue/") {
108108
for i := 1; i <= cfg.Publishers; i++ {
109109
queueName := utils.ResolveTerminus(cfg.PublishToTemplate, i)
110110
queueName = strings.TrimPrefix(queueName, "/amq/queue/")
111111
m.DeclareAndBind(cfg, queueName, i)
112112
}
113113
}
114-
// declare queues for STOMP consumers
114+
// declare queues for STOMP consumers using /amq/queue/ destination
115115
if cfg.ConsumerProto == config.STOMP && strings.HasPrefix(cfg.ConsumeFrom, "/amq/queue/") {
116116
for i := 1; i <= cfg.Consumers; i++ {
117117
q := utils.ResolveTerminus(cfg.ConsumeFromTemplate, i)

pkg/stomp/consumer.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (c *StompConsumer) Subscribe() {
115115
var err error
116116

117117
if c.Connection != nil {
118-
sub, err = c.Connection.Subscribe(c.Topic, stomp.AckClient, buildSubscribeOpts(c.Config)...)
118+
sub, err = c.Connection.Subscribe(c.Topic, stomp.AckClient, buildSubscribeOpts(c.Config, c.Topic)...)
119119
if err != nil {
120120
log.Error("subscription failed", "id", c.Id, "queue", c.Topic, "error", err.Error())
121121
return
@@ -215,7 +215,7 @@ func (c *StompConsumer) Stop(reason string) {
215215
log.Debug("consumer stopped", "id", c.Id, "reason", reason)
216216
}
217217

218-
func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error {
218+
func buildSubscribeOpts(cfg config.Config, destination string) []func(*frame.Frame) error {
219219
var subscribeOpts []func(*frame.Frame) error
220220

221221
subscribeOpts = append(subscribeOpts,
@@ -238,6 +238,29 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error {
238238
subscribeOpts = append(subscribeOpts,
239239
stomp.SubscribeOpt.Header("x-stream-filter", cfg.StreamFilterValues))
240240
}
241+
242+
switch cfg.Queues {
243+
case config.Classic:
244+
subscribeOpts = append(subscribeOpts,
245+
stomp.SubscribeOpt.Header("x-queue-type", "classic"))
246+
case config.Quorum:
247+
subscribeOpts = append(subscribeOpts,
248+
stomp.SubscribeOpt.Header("x-queue-type", "quorum"))
249+
case config.Stream:
250+
subscribeOpts = append(subscribeOpts,
251+
stomp.SubscribeOpt.Header("x-queue-type", "stream"))
252+
case config.Exclusive:
253+
subscribeOpts = append(subscribeOpts,
254+
stomp.SubscribeOpt.Header("exclusive", "true"))
255+
case config.Predeclared:
256+
// For /topic/ destinations with transient queues (QueueDurability == None),
257+
// use exclusive to avoid deprecated transient non-exclusive queues
258+
if strings.HasPrefix(destination, "/topic/") && cfg.QueueDurability == config.None {
259+
subscribeOpts = append(subscribeOpts,
260+
stomp.SubscribeOpt.Header("exclusive", "true"))
261+
}
262+
}
263+
241264
return subscribeOpts
242265
}
243266

0 commit comments

Comments
 (0)