Skip to content

Commit 06dea8b

Browse files
recrsnSteffen Siering
authored andcommitted
Log reconnect attempts (elastic#6404)
* Log reconnect attempts (elastic#5715) * Add identifiers to connections
1 parent 7e68c79 commit 06dea8b

13 files changed

Lines changed: 70 additions & 1 deletion

File tree

libbeat/monitoring/report/elasticsearch/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
152152
func (c *publishClient) Test(d testing.Driver) {
153153
c.es.Test(d)
154154
}
155+
156+
func (c *publishClient) String() string {
157+
return "publish(" + c.es.String() + ")"
158+
}

libbeat/outputs/backoff.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,7 @@ func (b *backoffClient) Test(d testing.Driver) {
7777

7878
c.Test(d)
7979
}
80+
81+
func (b *backoffClient) String() string {
82+
return "backoff(" + b.client.String() + ")"
83+
}

libbeat/outputs/console/console.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,7 @@ func (c *console) writeBuffer(buf []byte) error {
160160
}
161161
return nil
162162
}
163+
164+
func (c *console) String() string {
165+
return "console"
166+
}

libbeat/outputs/elasticsearch/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,10 @@ func (client *Client) Test(d testing.Driver) {
664664
})
665665
}
666666

667+
func (client *Client) String() string {
668+
return "elasticsearch(" + client.Connection.URL + ")"
669+
}
670+
667671
// Connect connects the client.
668672
func (conn *Connection) Connect() error {
669673
var err error

libbeat/outputs/failover.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/rand"
24+
"strings"
2425

2526
"github.com/elastic/beats/libbeat/publisher"
2627
"github.com/elastic/beats/libbeat/testing"
@@ -109,3 +110,13 @@ func (f *failoverClient) Test(d testing.Driver) {
109110
})
110111
}
111112
}
113+
114+
func (f *failoverClient) String() string {
115+
names := make([]string, len(f.clients))
116+
117+
for i, client := range f.clients {
118+
names[i] = client.String()
119+
}
120+
121+
return "failover(" + strings.Join(names, ",") + ")"
122+
}

libbeat/outputs/fileout/file.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func init() {
3535
}
3636

3737
type fileOutput struct {
38+
filePath string
3839
beat beat.Info
3940
observer outputs.Observer
4041
rotator *file.Rotator
@@ -74,6 +75,8 @@ func (out *fileOutput) init(beat beat.Info, c config) error {
7475
path = filepath.Join(c.Path, out.beat.Beat)
7576
}
7677

78+
out.filePath = path
79+
7780
var err error
7881
out.rotator, err = file.NewFileRotator(
7982
path,
@@ -149,3 +152,7 @@ func (out *fileOutput) Publish(
149152

150153
return nil
151154
}
155+
156+
func (out *fileOutput) String() string {
157+
return "file(" + out.filePath + ")"
158+
}

libbeat/outputs/kafka/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package kafka
2020
import (
2121
"errors"
2222
"fmt"
23+
"strings"
2324
"sync"
2425
"sync/atomic"
2526

@@ -141,6 +142,10 @@ func (c *client) Publish(batch publisher.Batch) error {
141142
return nil
142143
}
143144

145+
func (c *client) String() string {
146+
return "kafka(" + strings.Join(c.hosts, ",") + ")"
147+
}
148+
144149
func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
145150
event := &data.Content
146151
msg := &message{partition: -1, data: *data}

libbeat/outputs/logstash/async.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ func (c *asyncClient) Publish(batch publisher.Batch) error {
169169
return nil
170170
}
171171

172+
func (c *asyncClient) String() string {
173+
return "async(" + c.Client.String() + ")"
174+
}
175+
172176
func (c *asyncClient) publishWindowed(
173177
ref *msgRef,
174178
events []publisher.Event,

libbeat/outputs/outputs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type Client interface {
3535
// the publisher pipeline. The publisher pipeline (if configured by the output
3636
// factory) will take care of retrying/dropping events.
3737
Publish(publisher.Batch) error
38+
39+
String() string
3840
}
3941

4042
// NetworkClient defines the required client capabilities for network based

libbeat/outputs/redis/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ func (c *client) Publish(batch publisher.Batch) error {
152152
return err
153153
}
154154

155+
func (c *client) String() string {
156+
return "redis(" + c.Client.String() + ")"
157+
}
158+
155159
func (c *client) makePublish(
156160
conn redis.Conn,
157161
) (publishFn, error) {

0 commit comments

Comments
 (0)