Skip to content

Commit 843b668

Browse files
aleksmausmergify-bot
authored andcommitted
Osquerybeat: Improve osquery client connect code (#28848)
(cherry picked from commit d2e3b99)
1 parent 693f41c commit 843b668

4 files changed

Lines changed: 219 additions & 30 deletions

File tree

x-pack/osquerybeat/internal/osqd/args.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ var protectedFlags = Flags{
7979

8080
// The delimiter for a full query name that is concatenated as "pack_" + {{pack name}} + "_" + {{query name}} by default
8181
"pack_delimiter": "_",
82-
"config_refresh": 10,
82+
83+
// Refresh config every 60 seconds
84+
// The previous setting was 10 seconds which is unnecessary frequent.
85+
// Osquery does not expect that frequent policy/configuration changes
86+
// and can tolerate non real-time configuration change application.
87+
"config_refresh": 60,
8388
}
8489

8590
func init() {

x-pack/osquerybeat/internal/osqdcli/client.go

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"time"
1717

1818
"golang.org/x/sync/semaphore"
19+
"gotest.tools/gotestsum/log"
1920

2021
"github.com/osquery/osquery-go"
2122
genosquery "github.com/osquery/osquery-go/gen/osquery"
@@ -125,28 +126,21 @@ func (c *Client) Connect(ctx context.Context) error {
125126
func (c *Client) reconnect(ctx context.Context) error {
126127
c.close()
127128

128-
for i := 0; i < c.connectRetries; i++ {
129-
attempt := i + 1
130-
llog := c.log.With("attempt", attempt)
131-
llog.Debug("connecting")
129+
r := retry{
130+
maxRetry: c.connectRetries,
131+
retryWait: retryWait,
132+
log: c.log.With("context", "osquery client connect"),
133+
}
134+
135+
return r.Run(ctx, func(ctx context.Context) error {
132136
cli, err := osquery.NewClient(c.socketPath, c.timeout)
133137
if err != nil {
134-
llog.Errorf("failed to connect: %v", err)
135-
if i < c.connectRetries-1 {
136-
llog.Infof("wait before next connect attempt: retry_wait: %v", retryWait)
137-
if werr := waitWithContext(ctx, retryWait); werr != nil {
138-
err = werr
139-
break // Context cancelled, exit loop
140-
}
141-
} else {
142-
return err
143-
}
144-
continue
138+
log.Errorf("failed to connect: %v", err)
139+
return err
145140
}
146141
c.cli = cli
147-
break
148-
}
149-
return nil
142+
return nil
143+
})
150144
}
151145

152146
func (c *Client) Close() {
@@ -287,17 +281,6 @@ func (c *Client) queryColumnTypes(ctx context.Context, sql string) (map[string]s
287281
return colTypes, nil
288282
}
289283

290-
func waitWithContext(ctx context.Context, to time.Duration) error {
291-
t := time.NewTimer(to)
292-
defer t.Stop()
293-
select {
294-
case <-ctx.Done():
295-
return ctx.Err()
296-
case <-t.C:
297-
}
298-
return nil
299-
}
300-
301284
func resolveTypes(hits []map[string]string, colTypes map[string]string) []map[string]interface{} {
302285
resolved := make([]map[string]interface{}, 0, len(hits))
303286
for _, hit := range hits {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package osqdcli
6+
7+
import (
8+
"context"
9+
"time"
10+
11+
"github.com/elastic/beats/v7/libbeat/logp"
12+
)
13+
14+
type retry struct {
15+
maxRetry int
16+
retryWait time.Duration
17+
log *logp.Logger
18+
}
19+
20+
type tryFunc func(ctx context.Context) error
21+
22+
func (r *retry) Run(ctx context.Context, fn tryFunc) (err error) {
23+
maxAttempts := r.maxRetry + 1
24+
for i := 0; i < maxAttempts; i++ {
25+
attempt := i + 1
26+
r.log.Debugf("attempt %v out of %v", attempt, maxAttempts)
27+
28+
err = fn(ctx)
29+
30+
if err != nil {
31+
r.log.Debugf("attempt %v out of %v failed, err: %v", attempt, maxAttempts, err)
32+
if i != maxAttempts {
33+
if r.retryWait > 0 {
34+
r.log.Debugf("wait for %v before next retry", r.retryWait)
35+
err = waitWithContext(ctx, retryWait)
36+
if err != nil {
37+
r.log.Debugf("wait returned err: %v", err)
38+
return err
39+
}
40+
}
41+
} else {
42+
r.log.Debugf("no more attempts, return err: %v", err)
43+
return err
44+
}
45+
} else {
46+
r.log.Debugf("attempt %v out of %v succeeded", attempt, maxAttempts)
47+
return nil
48+
}
49+
}
50+
return err
51+
}
52+
53+
func waitWithContext(ctx context.Context, to time.Duration) error {
54+
t := time.NewTimer(to)
55+
defer t.Stop()
56+
select {
57+
case <-ctx.Done():
58+
return ctx.Err()
59+
case <-t.C:
60+
}
61+
return nil
62+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package osqdcli
6+
7+
import (
8+
"context"
9+
"testing"
10+
"time"
11+
12+
"github.com/elastic/beats/v7/libbeat/logp"
13+
"github.com/google/go-cmp/cmp"
14+
"github.com/google/go-cmp/cmp/cmpopts"
15+
)
16+
17+
func TestRetryRun(t *testing.T) {
18+
logp.Configure(logp.Config{
19+
Level: logp.DebugLevel,
20+
ToStderr: true,
21+
Selectors: []string{"*"},
22+
})
23+
24+
log := logp.NewLogger("retry_test").With("context", "osquery client connect")
25+
ctx := context.Background()
26+
27+
type fields struct {
28+
maxRetry int
29+
retryWait time.Duration
30+
log *logp.Logger
31+
}
32+
33+
type args struct {
34+
ctx context.Context
35+
fn tryFunc
36+
}
37+
38+
argsWithFunc := func(fn tryFunc) args {
39+
return args{
40+
ctx: ctx,
41+
fn: fn,
42+
}
43+
}
44+
45+
funcSucceedsOnNAttempt := func(attempt int) func(context.Context) error {
46+
curAttempt := 1
47+
return func(ctx context.Context) error {
48+
if curAttempt == attempt {
49+
return nil
50+
}
51+
curAttempt++
52+
return ErrAlreadyConnected
53+
}
54+
}
55+
56+
tests := []struct {
57+
name string
58+
fields fields
59+
args args
60+
wantErr error
61+
}{
62+
{
63+
name: "no retries, no wait, success",
64+
fields: fields{
65+
log: log,
66+
},
67+
args: argsWithFunc(func(ctx context.Context) error {
68+
return nil
69+
}),
70+
},
71+
{
72+
name: "no retries, no wait, error",
73+
fields: fields{
74+
log: log,
75+
},
76+
args: argsWithFunc(func(ctx context.Context) error {
77+
return ErrAlreadyConnected
78+
}),
79+
wantErr: ErrAlreadyConnected,
80+
},
81+
{
82+
name: "retries, no wait, no more retries fails",
83+
fields: fields{
84+
maxRetry: 3,
85+
log: log,
86+
},
87+
args: argsWithFunc(funcSucceedsOnNAttempt(8)),
88+
wantErr: ErrAlreadyConnected,
89+
},
90+
{
91+
name: "retries, no wait, success",
92+
fields: fields{
93+
maxRetry: 3,
94+
log: log,
95+
},
96+
args: argsWithFunc(funcSucceedsOnNAttempt(4)),
97+
},
98+
{
99+
name: "retries, with wait, success",
100+
fields: fields{
101+
maxRetry: 3,
102+
retryWait: 1 * time.Millisecond,
103+
log: log,
104+
},
105+
args: argsWithFunc(funcSucceedsOnNAttempt(4)),
106+
},
107+
{
108+
name: "retries, with wait, success sooner",
109+
fields: fields{
110+
maxRetry: 3,
111+
retryWait: 1 * time.Millisecond,
112+
log: log,
113+
},
114+
args: argsWithFunc(funcSucceedsOnNAttempt(2)),
115+
},
116+
}
117+
for _, tt := range tests {
118+
t.Run(tt.name, func(t *testing.T) {
119+
r := &retry{
120+
maxRetry: tt.fields.maxRetry,
121+
retryWait: tt.fields.retryWait,
122+
log: tt.fields.log,
123+
}
124+
err := r.Run(tt.args.ctx, tt.args.fn)
125+
if err != nil {
126+
if tt.wantErr != nil {
127+
diff := cmp.Diff(tt.wantErr, err, cmpopts.EquateErrors())
128+
if diff != "" {
129+
t.Error(diff)
130+
}
131+
} else {
132+
t.Errorf("got err: %v, wantErr: nil", err)
133+
}
134+
} else if tt.wantErr != nil {
135+
t.Errorf("got err: nil, wantErr: %v", tt.wantErr)
136+
}
137+
})
138+
}
139+
}

0 commit comments

Comments
 (0)