Skip to content

Commit 3a1d1ae

Browse files
authored
Add support for reading from UNIX datagram sockets (#22699)
## What does this PR do? This PR adds support for reading from UNIX datagram sockets both from the `unix` input and the `syslog` input. A new option is added to select the type of the socket named `socket_type`. Available options are: `stream` and `datagram`. ## Why is it important? A few applications which send logs over Unix sockets, use datagrams not streams. From now on, Filebeat can accept input from these applications as well. Closes #18632
1 parent 50c1745 commit 3a1d1ae

27 files changed

Lines changed: 880 additions & 392 deletions

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,7 @@ from being added to events by default. {pull}18159[18159]
731731
- Added `event.ingested` field to data from the Netflow module. {pull}22412[22412]
732732
- Improve panw ECS url fields mapping. {pull}22481[22481]
733733
- Improve Nats filebeat dashboard. {pull}22726[22726]
734+
- Add support for UNIX datagram sockets in `unix` input. {issues}18632[18632] {pull}22699[22699]
734735

735736
*Heartbeat*
736737

filebeat/docs/inputs/input-common-unix-options.asciidoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,14 @@ The maximum size of the message received over the socket. The default is `20MiB`
1414
[id="{beatname_lc}-input-{type}-unix-path"]
1515
==== `path`
1616

17-
The path to the Unix socket that will receive event streams.
17+
The path to the Unix socket that will receive events.
18+
19+
[float]
20+
[id="{beatname_lc}-input-{type}-unix-socket-type"]
21+
==== `socket_type`
22+
23+
The type to of the Unix socket that will receive events. Valid values
24+
are `stream` and `datagram`. The default is `stream`.
1825

1926
[float]
2027
[id="{beatname_lc}-input-{type}-unix-group"]

filebeat/input/syslog/config.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525

2626
"github.com/elastic/beats/v7/filebeat/harvester"
2727
"github.com/elastic/beats/v7/filebeat/inputsource"
28-
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
28+
"github.com/elastic/beats/v7/filebeat/inputsource/common/streaming"
2929
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
3030
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
3131
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
@@ -59,16 +59,17 @@ var defaultTCP = syslogTCP{
5959
}
6060

6161
type syslogUnix struct {
62-
unix.Config `config:",inline"`
63-
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
62+
unix.Config `config:",inline"`
6463
}
6564

66-
var defaultUnix = syslogUnix{
67-
Config: unix.Config{
68-
Timeout: time.Minute * 5,
69-
MaxMessageSize: 20 * humanize.MiByte,
70-
},
71-
LineDelimiter: "\n",
65+
func defaultUnix() syslogUnix {
66+
return syslogUnix{
67+
Config: unix.Config{
68+
Timeout: time.Minute * 5,
69+
MaxMessageSize: 20 * humanize.MiByte,
70+
LineDelimiter: "\n",
71+
},
72+
}
7273
}
7374

7475
var defaultUDP = udp.Config{
@@ -89,32 +90,26 @@ func factory(
8990
return nil, err
9091
}
9192

92-
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
93+
splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
9394
if splitFunc == nil {
9495
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
9596
}
9697

9798
logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host)
98-
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc)
99+
factory := streaming.SplitHandlerFactory(inputsource.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc)
99100

100101
return tcp.New(&config.Config, factory)
101102
case unix.Name:
102103
cfgwarn.Beta("Syslog Unix socket support is beta.")
103104

104-
config := defaultUnix
105+
config := defaultUnix()
105106
if err := cfg.Unpack(&config); err != nil {
106107
return nil, err
107108
}
108109

109-
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
110-
if splitFunc == nil {
111-
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
112-
}
113-
114110
logger := logp.NewLogger("input.syslog.unix").With("path", config.Config.Path)
115-
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, nf, splitFunc)
116111

117-
return unix.New(&config.Config, factory)
112+
return unix.New(logger, &config.Config, nf)
118113

119114
case udp.Name:
120115
config := defaultUDP

filebeat/input/tcp/input.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/elastic/beats/v7/filebeat/harvester"
2727
"github.com/elastic/beats/v7/filebeat/input"
2828
"github.com/elastic/beats/v7/filebeat/inputsource"
29-
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
29+
"github.com/elastic/beats/v7/filebeat/inputsource/common/streaming"
3030
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
3131
"github.com/elastic/beats/v7/libbeat/beat"
3232
"github.com/elastic/beats/v7/libbeat/common"
@@ -75,13 +75,13 @@ func NewInput(
7575
forwarder.Send(event)
7676
}
7777

78-
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
78+
splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
7979
if splitFunc == nil {
8080
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
8181
}
8282

8383
logger := logp.NewLogger("input.tcp").With("address", config.Config.Host)
84-
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc)
84+
factory := streaming.SplitHandlerFactory(inputsource.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc)
8585

8686
server, err := tcp.New(&config.Config, factory)
8787
if err != nil {

filebeat/input/unix/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@ import (
2626
)
2727

2828
type config struct {
29-
unix.Config `config:",inline"`
30-
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
29+
unix.Config `config:",inline"`
3130
}
3231

3332
func defaultConfig() config {
3433
return config{
3534
Config: unix.Config{
3635
Timeout: time.Minute * 5,
3736
MaxMessageSize: 20 * humanize.MiByte,
37+
SocketType: unix.StreamSocket,
38+
LineDelimiter: "\n",
3839
},
39-
LineDelimiter: "\n",
4040
}
4141
}

filebeat/input/unix/input.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818
package unix
1919

2020
import (
21-
"bufio"
22-
"fmt"
2321
"net"
2422
"time"
2523

2624
input "github.com/elastic/beats/v7/filebeat/input/v2"
2725
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
2826
"github.com/elastic/beats/v7/filebeat/inputsource"
29-
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
3027
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
3128
"github.com/elastic/beats/v7/libbeat/beat"
3229
"github.com/elastic/beats/v7/libbeat/common"
@@ -35,8 +32,8 @@ import (
3532
)
3633

3734
type server struct {
35+
unix.Server
3836
config
39-
splitFunc bufio.SplitFunc
4037
}
4138

4239
func Plugin() input.Plugin {
@@ -59,12 +56,7 @@ func configure(cfg *common.Config) (stateless.Input, error) {
5956
}
6057

6158
func newServer(config config) (*server, error) {
62-
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
63-
if splitFunc == nil {
64-
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
65-
}
66-
67-
return &server{config: config, splitFunc: splitFunc}, nil
59+
return &server{config: config}, nil
6860
}
6961

7062
func (s *server) Name() string { return "unix" }
@@ -83,17 +75,17 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
8375
log.Info("Starting Unix socket input")
8476
defer log.Info("Unix socket input stopped")
8577

86-
cb := func(data []byte, metadata inputsource.NetworkMetadata) {
78+
cb := inputsource.NetworkFunc(func(data []byte, metadata inputsource.NetworkMetadata) {
8779
event := createEvent(data, metadata)
8880
publisher.Publish(event)
89-
}
90-
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, log, unix.MetadataCallback, cb, s.splitFunc)
91-
server, err := unix.New(&s.config.Config, factory)
81+
})
82+
83+
server, err := unix.New(log, &s.config.Config, cb)
9284
if err != nil {
9385
return err
9486
}
9587

96-
log.Debugf("TCP Input '%v' initialized", ctx.ID)
88+
log.Debugf("%s Input '%v' initialized", s.config.Config.SocketType, ctx.ID)
9789

9890
err = server.Run(ctxtool.FromCanceller(ctx.Cancelation))
9991

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package dgram
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"net"
24+
"runtime"
25+
"strings"
26+
27+
"github.com/elastic/beats/v7/filebeat/inputsource"
28+
"github.com/elastic/beats/v7/libbeat/logp"
29+
)
30+
31+
// HandlerFactory returns a ConnectionHandler func
32+
type HandlerFactory func(config ListenerConfig) ConnectionHandler
33+
34+
// ConnectionHandler is able to read from incoming connections.
35+
type ConnectionHandler func(context.Context, net.PacketConn) error
36+
37+
// MetadataFunc defines callback executed when a line is read from the split handler.
38+
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata
39+
40+
// DatagramReaderFactory allows creation of a handler which can read packets from connections.
41+
func DatagramReaderFactory(
42+
family inputsource.Family,
43+
logger *logp.Logger,
44+
callback inputsource.NetworkFunc,
45+
) HandlerFactory {
46+
return func(config ListenerConfig) ConnectionHandler {
47+
return ConnectionHandler(func(ctx context.Context, conn net.PacketConn) error {
48+
for ctx.Err() == nil {
49+
50+
buffer := make([]byte, config.MaxMessageSize)
51+
//conn.SetDeadline(time.Now().Add(config.Timeout))
52+
53+
// If you are using Windows and you are using a fixed buffer and you get a datagram which
54+
// is bigger than the specified size of the buffer, it will return an `err` and the buffer will
55+
// contains a subset of the data.
56+
//
57+
// On Unix based system, the buffer will be truncated but no error will be returned.
58+
length, addr, err := conn.ReadFrom(buffer)
59+
if err != nil {
60+
if family == inputsource.FamilyUnix {
61+
fmt.Println("connection handler error", err)
62+
}
63+
// don't log any deadline events.
64+
e, ok := err.(net.Error)
65+
if ok && e.Timeout() {
66+
continue
67+
}
68+
69+
// Closed network error string will never change in Go 1.X
70+
// https://github.com/golang/go/issues/4373
71+
opErr, ok := err.(*net.OpError)
72+
if ok && strings.Contains(opErr.Err.Error(), "use of closed network connection") {
73+
logger.Info("Connection has been closed")
74+
return nil
75+
}
76+
77+
logger.Errorf("Error reading from the socket %s", err)
78+
79+
// On Windows send the current buffer and mark it as truncated.
80+
// The buffer will have content but length will return 0, addr will be nil.
81+
if family == inputsource.FamilyUDP && isLargerThanBuffer(err) {
82+
callback(buffer, inputsource.NetworkMetadata{RemoteAddr: addr, Truncated: true})
83+
continue
84+
}
85+
}
86+
87+
if length > 0 {
88+
callback(buffer[:length], inputsource.NetworkMetadata{RemoteAddr: addr})
89+
}
90+
}
91+
fmt.Println("end of connection handling")
92+
return nil
93+
})
94+
}
95+
}
96+
97+
func isLargerThanBuffer(err error) bool {
98+
if runtime.GOOS != "windows" {
99+
return false
100+
}
101+
return strings.Contains(err.Error(), windowErrBuffer)
102+
}

0 commit comments

Comments
 (0)