Skip to content

Commit af5d23e

Browse files
authored
[azure-eventhub] Support for AMQP-over-WebSocket transport in the processor v2 (#47956)
Support for AMQP-over-WebSocket transport in the azure-eventhub processor v2. Enterprise users often need to comply with network restrictions, which means using AMQP may not be an option. In addition to AMQP-over-WebSocket support, this change allows users to run the azure-eventhub input behind an HTTPS proxy.
1 parent 45d5eaf commit af5d23e

7 files changed

Lines changed: 137 additions & 32 deletions

File tree

NOTICE.txt

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8162,6 +8162,29 @@ Contents of probable licence file $GOMODCACHE/github.com/cloudfoundry/sonde-go@v
81628162
END OF TERMS AND CONDITIONS
81638163

81648164

8165+
--------------------------------------------------------------------------------
8166+
Dependency : github.com/coder/websocket
8167+
Version: v1.8.14
8168+
Licence type (autodetected): ISC
8169+
--------------------------------------------------------------------------------
8170+
8171+
Contents of probable licence file $GOMODCACHE/github.com/coder/websocket@v1.8.14/LICENSE.txt:
8172+
8173+
Copyright (c) 2025 Coder
8174+
8175+
Permission to use, copy, modify, and distribute this software for any
8176+
purpose with or without fee is hereby granted, provided that the above
8177+
copyright notice and this permission notice appear in all copies.
8178+
8179+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8180+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
8181+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
8182+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
8183+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
8184+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
8185+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
8186+
8187+
81658188
--------------------------------------------------------------------------------
81668189
Dependency : github.com/containerd/fifo
81678190
Version: v1.1.0
@@ -36851,11 +36874,11 @@ Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go
3685136874

3685236875
--------------------------------------------------------------------------------
3685336876
Dependency : github.com/Azure/go-amqp
36854-
Version: v1.3.0
36877+
Version: v1.4.0
3685536878
Licence type (autodetected): MIT
3685636879
--------------------------------------------------------------------------------
3685736880

36858-
Contents of probable licence file $GOMODCACHE/github.com/!azure/go-amqp@v1.3.0/LICENSE:
36881+
Contents of probable licence file $GOMODCACHE/github.com/!azure/go-amqp@v1.4.0/LICENSE:
3685936882

3686036883
MIT License
3686136884

@@ -45264,29 +45287,6 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
4526445287
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
4526545288

4526645289

45267-
--------------------------------------------------------------------------------
45268-
Dependency : github.com/coder/websocket
45269-
Version: v1.8.12
45270-
Licence type (autodetected): ISC
45271-
--------------------------------------------------------------------------------
45272-
45273-
Contents of probable licence file $GOMODCACHE/github.com/coder/websocket@v1.8.12/LICENSE.txt:
45274-
45275-
Copyright (c) 2023 Anmol Sethi <hi@nhooyr.io>
45276-
45277-
Permission to use, copy, modify, and distribute this software for any
45278-
purpose with or without fee is hereby granted, provided that the above
45279-
copyright notice and this permission notice appear in all copies.
45280-
45281-
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
45282-
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
45283-
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
45284-
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
45285-
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
45286-
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
45287-
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
45288-
45289-
4529045290
--------------------------------------------------------------------------------
4529145291
Dependency : github.com/containerd/containerd/v2
4529245292
Version: v2.1.0
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: azure-eventhub-v2-add-support-for-amqp-over-websocket-and-https-proxy
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: filebeat
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/beats/pull/47956
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/beats/issues/47823

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ require (
233233
require (
234234
github.com/apache/arrow-go/v18 v18.4.1
235235
github.com/cilium/ebpf v0.19.0
236+
github.com/coder/websocket v1.8.14
236237
github.com/elastic/gokrb5/v8 v8.0.0-20251105095404-23cc45e6a102
237238
github.com/forensicanalysis/fslib v0.15.2
238239
github.com/mattn/go-sqlite3 v1.14.32
@@ -280,7 +281,7 @@ require (
280281
github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect
281282
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
282283
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
283-
github.com/Azure/go-amqp v1.3.0 // indirect
284+
github.com/Azure/go-amqp v1.4.0 // indirect
284285
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
285286
github.com/Azure/go-autorest/autorest/adal v0.9.24 // indirect
286287
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 h1:UXT0o77lXQrikd1kg
9494
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0/go.mod h1:cTvi54pg19DoT07ekoeMgE/taAwNtCShVeZqA+Iv2xI=
9595
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
9696
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
97-
github.com/Azure/go-amqp v1.3.0 h1://1rikYhoIQNXJFXyoO/Rlb4+4EkHYfJceNtLlys2/4=
98-
github.com/Azure/go-amqp v1.3.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
97+
github.com/Azure/go-amqp v1.4.0 h1:Xj3caqi4comOF/L1Uc5iuBxR/pB6KumejC01YQOqOR4=
98+
github.com/Azure/go-amqp v1.4.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
9999
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
100100
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
101101
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
@@ -291,8 +291,8 @@ github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv
291291
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
292292
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 h1:sDMmm+q/3+BukdIpxwO365v/Rbspp2Nt5XntgQRXq8Q=
293293
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
294-
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
295-
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
294+
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
295+
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
296296
github.com/containerd/containerd/v2 v2.1.0 h1:lS6iJ/CwZrxYxKd6zWBz5LR7xOlMVQC78z68YtizUAM=
297297
github.com/containerd/containerd/v2 v2.1.0/go.mod h1:t2VqM0zSiEdi33qgtsMwUKrYyVg4oq2FPe+cs3LBt7w=
298298
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=

x-pack/filebeat/input/azureeventhub/auth.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,23 @@ func createCredential(cfg *azureInputConfig, log *logp.Logger) (azcore.TokenCred
4444
// CreateEventHubConsumerClient creates an Event Hub consumer client
4545
// using the configured authentication method from the provided config.
4646
func CreateEventHubConsumerClient(cfg *azureInputConfig, log *logp.Logger) (*azeventhubs.ConsumerClient, error) {
47+
// Create the consumer client options
48+
options := azeventhubs.ConsumerClientOptions{}
49+
50+
// Set up the transport
51+
switch cfg.Transport {
52+
case transportWebsocket:
53+
// Enable WebSocket transport if configured.
54+
// This allows connectivity through HTTP proxies and firewalls
55+
// that block AMQP port 5671 but allow HTTPS on port 443.
56+
log.Infow("using AMQP-over-WebSocket transport for Event Hub connection")
57+
options.NewWebSocketConn = newWebSocketConn
58+
default:
59+
// Default transport, nothing to do.
60+
log.Infow("using AMQP transport for Event Hub connection")
61+
}
62+
63+
// Set up the consumer client based on the authentication type
4764
switch cfg.AuthType {
4865
case AuthTypeConnectionString:
4966
// Use connection string authentication for Event Hub
@@ -79,7 +96,7 @@ func CreateEventHubConsumerClient(cfg *azureInputConfig, log *logp.Logger) (*aze
7996
cfg.ConnectionString,
8097
eventHubName,
8198
cfg.ConsumerGroup,
82-
nil,
99+
&options,
83100
)
84101
if err != nil {
85102
return nil, fmt.Errorf("failed to create consumer client from connection string: %w", err)
@@ -100,7 +117,7 @@ func CreateEventHubConsumerClient(cfg *azureInputConfig, log *logp.Logger) (*aze
100117
cfg.EventHubName,
101118
cfg.ConsumerGroup,
102119
credential,
103-
nil,
120+
&options,
104121
)
105122
if err != nil {
106123
return nil, fmt.Errorf("failed to create consumer client with credential: %w", err)

x-pack/filebeat/input/azureeventhub/config.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ type azureInputConfig struct {
125125
// for at least `PartitionReceiveCount` events, then it returns
126126
// the events it has received.
127127
PartitionReceiveCount int `config:"partition_receive_count"`
128+
// Transport controls the transport type to use for the event hub connection.
129+
// Possible values are "amqp" (default) and "websocket".
130+
// Use "websocket" when connecting through HTTP proxies or when port 5671 is blocked.
131+
Transport string `config:"transport"`
128132
}
129133

130134
func defaultConfig() azureInputConfig {
@@ -147,6 +151,8 @@ func defaultConfig() azureInputConfig {
147151
// Default is true to avoid reprocessing data from the start of the retention
148152
// when v2 replaces v1.
149153
MigrateCheckpoint: true,
154+
// Default transport is AMQP for backward compatibility.
155+
Transport: transportAmqp,
150156
}
151157
}
152158

@@ -159,6 +165,11 @@ func (conf *azureInputConfig) Validate() error {
159165
conf.AuthType = AuthTypeConnectionString
160166
}
161167

168+
// Validate event hub transport option
169+
if err := conf.validateEventHubTransport(); err != nil {
170+
return err
171+
}
172+
162173
// Validate the processor version first to ensure it's valid
163174
if err := conf.validateProcessorVersion(); err != nil {
164175
return err
@@ -192,6 +203,20 @@ func (conf *azureInputConfig) Validate() error {
192203
return nil
193204
}
194205

206+
// validateEventHubTransport validates the event hub transport option.
207+
func (conf *azureInputConfig) validateEventHubTransport() error {
208+
// Validate transport option
209+
if conf.Transport != transportAmqp && conf.Transport != transportWebsocket {
210+
return fmt.Errorf(
211+
"invalid event hub transport: %s (available transports: %s, %s)",
212+
conf.Transport,
213+
transportAmqp,
214+
transportWebsocket,
215+
)
216+
}
217+
return nil
218+
}
219+
195220
// validateProcessorVersion validates that the processor version is valid.
196221
func (conf *azureInputConfig) validateProcessorVersion() error {
197222
if conf.ProcessorVersion != processorV1 && conf.ProcessorVersion != processorV2 {

x-pack/filebeat/input/azureeventhub/v2_input.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import (
1010
"context"
1111
"errors"
1212
"fmt"
13+
"net"
1314
"strings"
1415
"time"
1516

17+
"github.com/coder/websocket"
18+
1619
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1720
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1821
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
@@ -43,6 +46,10 @@ const (
4346
// processorRestartMaxBackoff is the maximum backoff time before
4447
// restarting the processor.
4548
processorRestartMaxBackoff = 120 * time.Second
49+
// AMQP transport for Event Hub connection.
50+
transportAmqp = "amqp"
51+
// WebSocket transport for Event Hub connection.
52+
transportWebsocket = "websocket"
4653
)
4754

4855
// azureInputConfig the Azure Event Hub input v2,
@@ -619,3 +626,26 @@ func shutdownPartitionResources(ctx context.Context, partitionClient *azeventhub
619626
// processing events for this partition.
620627
defer pipelineClient.Close()
621628
}
629+
630+
// newWebSocketConn creates a WebSocket connection for AMQP-over-WebSocket transport.
631+
//
632+
// This function is used when the transport configuration is set to "websocket".
633+
// It enables connectivity through HTTP proxies and firewalls that block the
634+
// standard AMQP port (5671) but allow HTTPS traffic on port 443.
635+
//
636+
// HTTP proxy configuration is automatically detected from environment variables:
637+
// - HTTP_PROXY / http_proxy
638+
// - HTTPS_PROXY / https_proxy
639+
// - NO_PROXY / no_proxy
640+
func newWebSocketConn(ctx context.Context, args azeventhubs.WebSocketConnParams) (net.Conn, error) {
641+
opts := &websocket.DialOptions{
642+
Subprotocols: []string{"amqp"},
643+
}
644+
645+
wssConn, _, err := websocket.Dial(ctx, args.Host, opts)
646+
if err != nil {
647+
return nil, err
648+
}
649+
650+
return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil
651+
}

0 commit comments

Comments
 (0)