@@ -20,12 +20,15 @@ package beater
2020import (
2121 "errors"
2222 "fmt"
23+ "sync"
2324 "syscall"
2425 "time"
2526
2627 "github.com/elastic/beats/v7/heartbeat/config"
2728 "github.com/elastic/beats/v7/heartbeat/hbregistry"
2829 "github.com/elastic/beats/v7/heartbeat/monitors"
30+ "github.com/elastic/beats/v7/heartbeat/monitors/plugin"
31+ "github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
2932 "github.com/elastic/beats/v7/heartbeat/scheduler"
3033 "github.com/elastic/beats/v7/libbeat/autodiscover"
3134 "github.com/elastic/beats/v7/libbeat/beat"
@@ -34,6 +37,7 @@ import (
3437 "github.com/elastic/beats/v7/libbeat/common/reload"
3538 "github.com/elastic/beats/v7/libbeat/logp"
3639 "github.com/elastic/beats/v7/libbeat/management"
40+ "github.com/elastic/beats/v7/x-pack/functionbeat/function/core"
3741
3842 _ "github.com/elastic/beats/v7/libbeat/processors/script"
3943)
@@ -81,10 +85,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
8185// Run executes the beat.
8286func (bt * Heartbeat ) Run (b * beat.Beat ) error {
8387 logp .Info ("heartbeat is running! Hit CTRL-C to stop it." )
84-
8588 groups , _ := syscall .Getgroups ()
8689 logp .Info ("Effective user/group ids: %d/%d, with groups: %v" , syscall .Geteuid (), syscall .Getegid (), groups )
8790
91+ if bt .config .RunOnce != nil {
92+ err := bt .runRunOnce (b )
93+ if err != nil {
94+ return err
95+ }
96+ return nil
97+ }
98+
8899 stopStaticMonitors , err := bt .RunStaticMonitors (b )
89100 if err != nil {
90101 return err
@@ -126,6 +137,65 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
126137 return nil
127138}
128139
140+ // runRunOnce runs the given config then exits immediately after any queued events have been sent to ES
141+ func (bt * Heartbeat ) runRunOnce (b * beat.Beat ) error {
142+ logp .Info ("Starting run_once run. This is an experimental feature and may be changed or removed in the future!" )
143+ cfgs := bt .config .RunOnce
144+
145+ publishClient , err := core .NewSyncClient (logp .NewLogger ("run_once mode" ), b .Publisher , beat.ClientConfig {})
146+ if err != nil {
147+ return fmt .Errorf ("could not create sync client: %w" , err )
148+ }
149+ defer publishClient .Close ()
150+
151+ wg := & sync.WaitGroup {}
152+ for _ , cfg := range cfgs {
153+ err := runRunOnceSingleConfig (cfg , publishClient , wg )
154+ if err != nil {
155+ logp .Warn ("error running run_once config: %s" , err )
156+ }
157+ }
158+
159+ wg .Wait ()
160+ publishClient .Wait ()
161+
162+ logp .Info ("Ending run_once run" )
163+
164+ return nil
165+ }
166+
167+ func runRunOnceSingleConfig (cfg * common.Config , publishClient * core.SyncClient , wg * sync.WaitGroup ) (err error ) {
168+ sf , err := stdfields .ConfigToStdMonitorFields (cfg )
169+ if err != nil {
170+ return fmt .Errorf ("could not get stdmon fields: %w" , err )
171+ }
172+ pluginFactory , exists := plugin .GlobalPluginsReg .Get (sf .Type )
173+ if ! exists {
174+ return fmt .Errorf ("no plugin for type: %s" , sf .Type )
175+ }
176+ plugin , err := pluginFactory .Make (sf .Type , cfg )
177+ if err != nil {
178+ return err
179+ }
180+
181+ results := plugin .RunWrapped (sf )
182+
183+ wg .Add (1 )
184+ go func () {
185+ defer wg .Done ()
186+ defer plugin .Close ()
187+ for {
188+ event := <- results
189+ if event == nil {
190+ break
191+ }
192+ publishClient .Publish (* event )
193+ }
194+ }()
195+
196+ return nil
197+ }
198+
129199// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
130200func (bt * Heartbeat ) RunStaticMonitors (b * beat.Beat ) (stop func (), err error ) {
131201 factory := monitors .NewFactory (b .Info , bt .scheduler )
0 commit comments