Skip to content

Commit 48e293d

Browse files
committed
Add JMS queue type and JMS selector filter support
Add --amqp-jms-selector flag for consuming from JMS queues with message selectors, and add JMS as a queue type for declaration via the management API (with x-selector-fields list conversion).
1 parent 43f9050 commit 48e293d

5 files changed

Lines changed: 231 additions & 19 deletions

File tree

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ func RootCmd() *cobra.Command {
133133
"AMQP property filters, eg. key1=&p:prefix")
134134
amqpConsumerFlags.StringVar(&cfg.Amqp.SQLFilter, "amqp-sql-filter", "",
135135
"AMQP SQL Filter expression eg 'proprties.subject LIKE 'foo-%'")
136+
amqpConsumerFlags.StringVar(&cfg.Amqp.JMSSelectorFilter, "amqp-jms-selector", "",
137+
"JMS message selector expression for JMS queues, eg. \"color = 'red' AND price > 10\"")
136138
amqpConsumerFlags.BoolVar(&cfg.Amqp.ConsumeSettled, "amqp-consume-settled", false,
137139
"Request the broker to send messages pre-settled (no consumer acknowledgment)")
138140
amqpConsumerFlags.IntVar(&cfg.Amqp.ModifyRate, "amqp-modify-rate", 0,

docs/jms-queue-testing.md

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
# Testing JMS Queues with omq
2+
3+
## Prerequisites
4+
5+
- Tanzu RabbitMQ 4.3+ with the `rabbitmq_jms` plugin enabled
6+
- `omq` 0.45.0 or newer
7+
8+
## Declaring JMS Queues
9+
10+
Use `--queues jms` to declare a JMS queue. The `x-selector-fields` argument controls
11+
which message properties are held in memory for selector evaluation. Use `*` for all
12+
application properties.
13+
14+
```bash
15+
# Declare a JMS queue with all application properties indexed
16+
omq amqp --queues jms \
17+
--queue-args 'x-selector-fields=*' \
18+
-t /queues/my-jms-queue -T /queues/my-jms-queue \
19+
-x 1 -C 10 -y 0
20+
21+
# Declare with specific fields indexed (comma-separated)
22+
omq amqp --queues jms \
23+
--queue-args 'x-selector-fields=color,region,priority' \
24+
-t /queues/my-jms-queue -T /queues/my-jms-queue \
25+
-x 1 -C 10 -y 0
26+
27+
# Declare with selector field max bytes limit
28+
omq amqp --queues jms \
29+
--queue-args 'x-selector-fields=*,x-selector-field-max-bytes=128' \
30+
-t /queues/my-jms-queue -T /queues/my-jms-queue \
31+
-x 1 -C 10 -y 0
32+
```
33+
34+
**Important**: `x-selector-fields` must be a list. When using `--queues jms`, omq
35+
automatically converts the comma-separated string to a list. If you declare the queue
36+
via other means, ensure this argument is a proper list (e.g. `["*"]`), not a plain string.
37+
38+
## Publishing Messages with Application Properties
39+
40+
Use `--amqp-app-property` to set application properties on messages.
41+
Values are cycled through comma-separated entries.
42+
43+
```bash
44+
# Publish messages with different color property values
45+
omq amqp --queues predeclared \
46+
-t /queues/my-jms-queue \
47+
-x 1 -C 10 -y 0 \
48+
--amqp-app-property 'color=red,blue,green,yellow'
49+
50+
# Publish with multiple properties
51+
omq amqp --queues predeclared \
52+
-t /queues/my-jms-queue \
53+
-x 1 -C 10 -y 0 \
54+
--amqp-app-property 'color=red,blue,green' \
55+
--amqp-app-property 'size=small,large'
56+
```
57+
58+
## Consuming with JMS Message Selectors
59+
60+
Use `--amqp-jms-selector` to filter messages using selector expressions.
61+
62+
### Basic Equality
63+
64+
```bash
65+
# Consume only red messages
66+
omq amqp --queues predeclared \
67+
-T /queues/my-jms-queue \
68+
-x 0 -y 1 -D 100 -z 10s \
69+
--amqp-jms-selector "color = 'red'"
70+
```
71+
72+
### IS NULL / IS NOT NULL
73+
74+
```bash
75+
# Consume messages where a property is set
76+
omq amqp --queues predeclared \
77+
-T /queues/my-jms-queue \
78+
-x 0 -y 1 -D 100 -z 10s \
79+
--amqp-jms-selector "color IS NOT NULL"
80+
```
81+
82+
## Message Priority
83+
84+
JMS queues support two priority levels: normal (0-4) and expedited (5-9).
85+
Higher priority messages are delivered first.
86+
87+
```bash
88+
# Publish with mixed priorities
89+
omq amqp --queues jms \
90+
--queue-args 'x-selector-fields=*' \
91+
-t /queues/jms-priority -T /queues/jms-priority \
92+
-C 100 -y 0 --detect-out-of-order-messages \
93+
--message-priority '{{ randInt 0 10 }}'
94+
95+
# Consume and observe priority ordering
96+
omq amqp --queues predeclared \
97+
-T /queues/jms-priority \
98+
-x 0 -D 100 -z 10s --log-level debug --detect-out-of-order-messages
99+
```
100+
101+
Note: if you want to use `--detect-out-of-order-messages` when consuming, it also
102+
has to be specified when publishing, since it sets annotations used for detecting
103+
out-of-sequence messages.
104+
105+
Alternatively, you can skip this flag altogether and just look at the output - with
106+
`--log-level debug` omq prints the priority of messages it received.
107+
108+
## Consumer Priority
109+
110+
JMS queues support three consumer priority levels: -1, 0, and 1.
111+
Higher priority consumers receive messages first.
112+
113+
```bash
114+
# Start priority 0 consumer
115+
omq amqp --queues jms -x 0 \
116+
-T /queues/jms-consumer-priority \
117+
--consumer-priority 0
118+
119+
# Start a publisher
120+
omq amqp --queues jms -y 0 \
121+
-t /queues/jms-consumer-priority --rate 1
122+
123+
# Start a higher priority consumer - you should see that the previous consumer
124+
# no longer receives messages, while this higher-priority consumer takes over
125+
omq amqp --queues jms -x 0 \
126+
-T /queues/jms-consumer-priority \
127+
--consumer-priority 1
128+
```
129+
130+
## Queue Overflow (reject-publish)
131+
132+
JMS queues support `x-max-length` with `reject-publish` overflow strategy.
133+
134+
```bash
135+
# Declare queue with max length 5 and reject-publish
136+
omq amqp --queues jms \
137+
--queue-args 'x-selector-fields=*,x-max-length=5,x-overflow=reject-publish' \
138+
-t /queues/jms-overflow -T /queues/jms-overflow \
139+
-x 1 -C 20 -y 0
140+
```
141+
142+
## AMQP 0-9-1 Interoperability
143+
144+
Messages published via AMQP 0-9-1 with headers can be consumed via AMQP 1.0
145+
with JMS selectors. AMQP 0-9-1 headers are exposed as application properties
146+
to AMQP 1.0 consumers.
147+
148+
```bash
149+
# Publish with AMQP 0-9-1, consume with AMQP 1.0 JMS selector
150+
omq amqp091-amqp --queues jms \
151+
--queue-args 'x-selector-fields=*' \
152+
-t /queues/jms-interop -T /queues/jms-interop \
153+
-x 1 --amqp091-header color=red,blue,green,yellow \
154+
-y 1 --amqp-jms-selector "color = 'red'"
155+
```
156+
157+
The consumption rate should be roughly 25% of the publishing rate,
158+
since the consumer only consumes messages with one of the 4 color values.
159+
160+
## Delayed Messages
161+
162+
Publish messages with a 30 second delay:
163+
```
164+
omq amqp --queues jms \
165+
--queue-args 'x-selector-fields=*' \
166+
-t /queues/jms-delay -T /queues/jms-delay \
167+
-x 1 -C 100 -y 0 \
168+
--amqp-msg-annotation 'x-opt-delivery-time={{ now | date_modify "+30s" | unixEpoch | mul 1000 }}'
169+
```
170+
171+
The `x-opt-delivery-time` value must be in **milliseconds** since the Unix epoch.
172+
`unixEpoch` sprig function returns seconds, so `| mul 1000` is required.
173+
Alternative expression to delay by 30s would be `{{ now | unixEpoch | add 30 | mul 1000 }}`.
174+

pkg/amqp10/consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,5 +465,9 @@ func buildLinkFilters(cfg config.Config) []amqp.LinkFilter {
465465
filters = append(filters, amqp.NewLinkFilter("sql-filter", 0x120,
466466
cfg.Amqp.SQLFilter))
467467
}
468+
if cfg.Amqp.JMSSelectorFilter != "" {
469+
filters = append(filters, amqp.NewLinkFilter("jms-selector", 0x0000468C00000004,
470+
cfg.Amqp.JMSSelectorFilter))
471+
}
468472
return filters
469473
}

pkg/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
Quorum
3838
Stream
3939
Exclusive
40+
JMS
4041
)
4142

4243
var AmqpDurabilityModes = map[AmqpDurabilityMode][]string{
@@ -51,6 +52,7 @@ var QueueTypes = map[QueueType][]string{
5152
Quorum: {"quorum"},
5253
Stream: {"stream"},
5354
Exclusive: {"exclusive"},
55+
JMS: {"jms"},
5456
}
5557

5658
type AnnotationTemplate struct {
@@ -74,6 +76,7 @@ type AmqpOptions struct {
7476
MsgAnnotationTemplates map[string]*template.Template
7577
AppPropertyFilters map[string]string
7678
SQLFilter string
79+
JMSSelectorFilter string
7780
ModifyOptions AmqpModifyOptions
7881
ModifyRate int
7982
}

pkg/mgmt/mgmt.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"errors"
7+
"fmt"
78
"net/url"
89
"os"
910
"strings"
@@ -126,46 +127,74 @@ func (m *Mgmt) DeclareAndBind(cfg config.Config, queueName string, id int) *rmq.
126127
return nil
127128
}
128129

129-
var queueSpec rmq.IQueueSpecification
130-
switch cfg.Queues {
131-
case config.Classic:
132-
queueSpec = &rmq.ClassicQueueSpecification{Name: queueName, Arguments: cfg.QueueArgs}
133-
case config.Quorum:
134-
queueSpec = &rmq.QuorumQueueSpecification{Name: queueName, Arguments: cfg.QueueArgs}
135-
case config.Stream:
136-
queueSpec = &rmq.StreamQueueSpecification{Name: queueName, Arguments: cfg.QueueArgs}
137-
}
138-
139130
conn := instance.connection()
140131
if conn == nil {
141132
return nil
142133
}
143134

144-
qi, err := conn.Management().DeclareQueue(context.Background(), queueSpec)
145-
if err != nil {
146-
log.Error("Failed to declare queue", "name", queueName, "error", err)
147-
os.Exit(1)
135+
var queueInfo *rmq.AmqpQueueInfo
136+
137+
if cfg.Queues == config.JMS {
138+
args := map[string]any{"x-queue-type": "jms"}
139+
for k, v := range cfg.QueueArgs {
140+
if k == "x-selector-fields" {
141+
if s, ok := v.(string); ok {
142+
fields := strings.Split(s, ",")
143+
args[k] = fields
144+
continue
145+
}
146+
}
147+
args[k] = v
148+
}
149+
body := map[string]any{
150+
"durable": true,
151+
"auto_delete": false,
152+
"exclusive": false,
153+
"arguments": args,
154+
}
155+
path := fmt.Sprintf("/queues/%s", queueName)
156+
_, err := conn.Management().Request(context.Background(), body, path, "PUT", []int{200, 409})
157+
if err != nil {
158+
log.Error("Failed to declare JMS queue", "name", queueName, "error", err)
159+
os.Exit(1)
160+
}
161+
log.Debug("JMS queue declared", "name", queueName)
162+
} else {
163+
var queueSpec rmq.IQueueSpecification
164+
switch cfg.Queues {
165+
case config.Classic:
166+
queueSpec = &rmq.ClassicQueueSpecification{Name: queueName, Arguments: cfg.QueueArgs}
167+
case config.Quorum:
168+
queueSpec = &rmq.QuorumQueueSpecification{Name: queueName, Arguments: cfg.QueueArgs}
169+
case config.Stream:
170+
queueSpec = &rmq.StreamQueueSpecification{Name: queueName, Arguments: cfg.QueueArgs}
171+
}
172+
173+
qi, err := conn.Management().DeclareQueue(context.Background(), queueSpec)
174+
if err != nil {
175+
log.Error("Failed to declare queue", "name", queueName, "error", err)
176+
os.Exit(1)
177+
}
178+
log.Debug("queue declared", "name", qi.Name(), "type", qi.Type())
179+
queueInfo = qi
148180
}
149-
log.Debug("queue declared", "name", qi.Name(), "type", qi.Type())
150181

151182
if m.cleanupQueues {
152183
m.declaredQueues[queueName] = true
153184
}
154185

155186
exchangeName, routingKey := parsePublishTo(cfg.PublisherProto, cfg.PublishToTemplate, id)
156187

157-
// explicitly set routing key overrides everything else
158188
if cfg.BindingKey != "" {
159189
routingKey = utils.InjectId(cfg.BindingKey, id)
160190
}
161191

162-
// explicitly set exchange overrides everything else
163192
if cfg.Exchange != "" {
164193
exchangeName = cfg.Exchange
165194
}
166195

167196
if exchangeName != "amq.default" {
168-
_, err = instance.connection().Management().Bind(context.Background(), &rmq.ExchangeToQueueBindingSpecification{
197+
_, err := instance.connection().Management().Bind(context.Background(), &rmq.ExchangeToQueueBindingSpecification{
169198
SourceExchange: exchangeName,
170199
DestinationQueue: queueName,
171200
BindingKey: routingKey,
@@ -177,7 +206,7 @@ func (m *Mgmt) DeclareAndBind(cfg config.Config, queueName string, id int) *rmq.
177206
log.Debug("binding declared", "exchange", exchangeName, "queue", queueName, "key", routingKey)
178207
}
179208

180-
return qi
209+
return queueInfo
181210
}
182211

183212
func parsePublishTo(proto config.Protocol, publishToTemplate *template.Template, id int) (string, string) {

0 commit comments

Comments
 (0)