Skip to content

Commit 6d09457

Browse files
authored
[Heartbeat] One shot mode (#25972)
1 parent 58ff2a4 commit 6d09457

15 files changed

Lines changed: 282 additions & 16 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
764764

765765
- Add mime type detection for http responses. {pull}22976[22976]
766766
- Support JSON expressions / validation of JSON arrays. {pull}28073[28073]
767+
- Experimental 'run once' mode. {pull}25972[25972]
767768

768769
*Journalbeat*
769770

heartbeat/_meta/config/beat.yml.tmpl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ heartbeat.monitors:
3737
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
3838
#service.name: my-apm-service-name
3939
40+
# Experimental: Configure monitors that run exactly once.
41+
# If enabled, heartbeat.monitors will be ignored
42+
# Heartbeat will run these monitors once then exit.
43+
#heartbeat.run_once:
44+
#- type: http
45+
#id: my-monitor
46+
#name: My Monitor
47+
#urls: ["http://localhost:9200"]
48+
# NOTE: you must still provide the schedule field! Heartbeat
49+
# Uses this to determine the contents of the monitor.timespan field
50+
#schedule: '@every 10s'
51+
4052
{{header "Elasticsearch template setting"}}
4153
4254
setup.template.settings:

heartbeat/beater/heartbeat.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ package beater
2020
import (
2121
"errors"
2222
"fmt"
23+
"sync"
2324
"syscall"
2425
"time"
2526

2627
"github.com/elastic/beats/v7/heartbeat/config"
2728
"github.com/elastic/beats/v7/heartbeat/hbregistry"
2829
"github.com/elastic/beats/v7/heartbeat/monitors"
30+
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
31+
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
2932
"github.com/elastic/beats/v7/heartbeat/scheduler"
3033
"github.com/elastic/beats/v7/libbeat/autodiscover"
3134
"github.com/elastic/beats/v7/libbeat/beat"
@@ -34,6 +37,7 @@ import (
3437
"github.com/elastic/beats/v7/libbeat/common/reload"
3538
"github.com/elastic/beats/v7/libbeat/logp"
3639
"github.com/elastic/beats/v7/libbeat/management"
40+
"github.com/elastic/beats/v7/x-pack/functionbeat/function/core"
3741

3842
_ "github.com/elastic/beats/v7/libbeat/processors/script"
3943
)
@@ -81,10 +85,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
8185
// Run executes the beat.
8286
func (bt *Heartbeat) Run(b *beat.Beat) error {
8387
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")
84-
8588
groups, _ := syscall.Getgroups()
8689
logp.Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)
8790

91+
if bt.config.RunOnce != nil {
92+
err := bt.runRunOnce(b)
93+
if err != nil {
94+
return err
95+
}
96+
return nil
97+
}
98+
8899
stopStaticMonitors, err := bt.RunStaticMonitors(b)
89100
if err != nil {
90101
return err
@@ -126,6 +137,65 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
126137
return nil
127138
}
128139

140+
// runRunOnce runs the given config then exits immediately after any queued events have been sent to ES
141+
func (bt *Heartbeat) runRunOnce(b *beat.Beat) error {
142+
logp.Info("Starting run_once run. This is an experimental feature and may be changed or removed in the future!")
143+
cfgs := bt.config.RunOnce
144+
145+
publishClient, err := core.NewSyncClient(logp.NewLogger("run_once mode"), b.Publisher, beat.ClientConfig{})
146+
if err != nil {
147+
return fmt.Errorf("could not create sync client: %w", err)
148+
}
149+
defer publishClient.Close()
150+
151+
wg := &sync.WaitGroup{}
152+
for _, cfg := range cfgs {
153+
err := runRunOnceSingleConfig(cfg, publishClient, wg)
154+
if err != nil {
155+
logp.Warn("error running run_once config: %s", err)
156+
}
157+
}
158+
159+
wg.Wait()
160+
publishClient.Wait()
161+
162+
logp.Info("Ending run_once run")
163+
164+
return nil
165+
}
166+
167+
func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, wg *sync.WaitGroup) (err error) {
168+
sf, err := stdfields.ConfigToStdMonitorFields(cfg)
169+
if err != nil {
170+
return fmt.Errorf("could not get stdmon fields: %w", err)
171+
}
172+
pluginFactory, exists := plugin.GlobalPluginsReg.Get(sf.Type)
173+
if !exists {
174+
return fmt.Errorf("no plugin for type: %s", sf.Type)
175+
}
176+
plugin, err := pluginFactory.Make(sf.Type, cfg)
177+
if err != nil {
178+
return err
179+
}
180+
181+
results := plugin.RunWrapped(sf)
182+
183+
wg.Add(1)
184+
go func() {
185+
defer wg.Done()
186+
defer plugin.Close()
187+
for {
188+
event := <-results
189+
if event == nil {
190+
break
191+
}
192+
publishClient.Publish(*event)
193+
}
194+
}()
195+
196+
return nil
197+
}
198+
129199
// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
130200
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
131201
factory := monitors.NewFactory(b.Info, bt.scheduler)

heartbeat/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
// Config defines the structure of heartbeat.yml.
2929
type Config struct {
30-
// Modules is a list of module specific configuration data.
30+
RunOnce []*common.Config `config:"run_once"`
3131
Monitors []*common.Config `config:"monitors"`
3232
ConfigMonitors *common.Config `config:"config.monitors"`
3333
Scheduler Scheduler `config:"scheduler"`

heartbeat/docs/heartbeat-options.asciidoc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,37 @@ include::monitors/monitor-tcp.asciidoc[]
109109
include::monitors/monitor-http.asciidoc[]
110110

111111
include::monitors/monitor-browser.asciidoc[]
112+
113+
[float]
114+
[[run-once-mode]]
115+
=== Run Once Mode (Experimental)
116+
117+
You can configure {beatname_uc} run monitors exactly once then exit, bypassing the scheduler. This is referred to as running {beatname_uc} in "run once" mode. This is an experimental feature
118+
and is subject to change.
119+
120+
[source,yaml]
121+
----------------------------------------------------------------------
122+
# heartbeat.yml
123+
heartbeat.run_once:
124+
- type: icmp
125+
id: ping-myhost
126+
name: My Host Ping
127+
hosts: ["myhost"]
128+
# Note that schedule is still needed to inform heartbeat when the next
129+
# expected check is to be run. This is needed to populate the monitor.timespan field used by the Uptime app.
130+
schedule: '@every 5s'
131+
- type: tcp
132+
id: myhost-tcp-echo
133+
name: My Host TCP Echo
134+
hosts: ["myhost:777"] # default TCP Echo Protocol
135+
check.send: "Check"
136+
check.receive: "Check"
137+
schedule: '@every 5s'
138+
- type: http
139+
id: service-status
140+
name: Service Status
141+
service.name: my-apm-service-name
142+
hosts: ["http://localhost:80/service/status"]
143+
check.response.status: [200]
144+
schedule: '@every 5s'
145+
----------------------------------------------------------------------

heartbeat/heartbeat.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ heartbeat.monitors:
3737
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
3838
#service.name: my-apm-service-name
3939

40+
# Experimental: Configure monitors that run exactly once.
41+
# If enabled, heartbeat.monitors will be ignored
42+
# Heartbeat will run these monitors once then exit.
43+
#heartbeat.run_once:
44+
#- type: http
45+
#id: my-monitor
46+
#name: My Monitor
47+
#urls: ["http://localhost:9200"]
48+
# NOTE: you must still provide the schedule field! Heartbeat
49+
# Uses this to determine the contents of the monitor.timespan field
50+
#schedule: '@every 10s'
51+
4052
# ======================= Elasticsearch template setting =======================
4153

4254
setup.template.settings:

heartbeat/monitors/active/http/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func create(
121121
js[i] = wrappers.WithURLField(u, job)
122122
}
123123

124-
return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil
124+
return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil
125125
}
126126

127127
func newRoundTripper(config *Config) (http.RoundTripper, error) {

heartbeat/monitors/active/icmp/icmp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
113113
j = append(j, wrappers.WithURLField(u, job))
114114
}
115115

116-
return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil
116+
return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil
117117
}
118118

119119
func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job {

heartbeat/monitors/active/tcp/tcp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func createWithResolver(
6868
return plugin.Plugin{}, err
6969
}
7070

71-
return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(jc.endpoints)}, nil
71+
return plugin.Plugin{Jobs: js, Endpoints: len(jc.endpoints)}, nil
7272
}
7373

7474
// jobFactory is where most of the logic here lives. It provides a common context around

heartbeat/monitors/mocks_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
145145
return plugin.PluginFactory{
146146
Name: "test",
147147
Aliases: []string{"testAlias"},
148-
Builder: func(s string, config *common.Config) (plugin.Plugin, error) {
148+
Make: func(s string, config *common.Config) (plugin.Plugin, error) {
149149
built.Inc()
150150
// Declare a real config block with a required attr so we can see what happens when it doesn't work
151151
unpacked := struct {
@@ -160,7 +160,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
160160
closed.Inc()
161161
return nil
162162
}
163-
return plugin.Plugin{Jobs: j, Close: closer, Endpoints: 1}, err
163+
return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err
164164
},
165165
Stats: plugin.NewPluginCountersRecorder("test", reg)},
166166
built,

0 commit comments

Comments
 (0)