Skip to content

[WIP] Event scheduling interfaces#7082

Closed
urso wants to merge 9 commits intoelastic:masterfrom
urso:event-sched-interfaces
Closed

[WIP] Event scheduling interfaces#7082
urso wants to merge 9 commits intoelastic:masterfrom
urso:event-sched-interfaces

Conversation

@urso
Copy link
Copy Markdown

@urso urso commented May 11, 2018

The changes in this PR add support for configurable event scheduling in libbeat. Event scheduling will be applied when a beat publishes an event. Users configure policies like rate limiting and others, which will be applied to event publishing, before the event processors are executed.

beats pipeline with scheduling

Policies can be local or have shared state. Thanks to shared state, policies like rate limiting can affect a group of workers/publishers.

A policy implementation is free to create back-pressure by blocking event publishing or drop/modify events.

Besides introducing interfaces and configuration support, a simple ratelimit policy is provided in this PR. The current implementation is only for demonstration purposes (sample code) and not recommended for production. The rate limiter supports full local rate limiting (per beat.Client connection), but also a shared mode. With shared mode, the total bandwidth enforced by the rate limiter will be shared between all beat.Client instances.

Scheduling can be defined locally per input/module instance, per group and even globally, per beat. A scheduling group can also have a parent group. The policies are concatenated in this order: local, group, [parent group], global.

Scheduling 'groups' are used to configured shared resources between multiple workers/data sources.

Sample config:

filebeat.inputs:
- ...
  scheduling.policies:
  - ratelimit.events_per_second: 800
- ...
  scheduling.group: A
  scheduling.policies:
  - ratelimit:
      events_per_second: 100
      shared: false
- ...
  scheduling.group: A
  - ratelimit:
      events_per_second: 500
      shared: false

# per group policies
scheduling.groups:
  A.policies:
  - ratelimit.events_per_second: 800

# global policies
scheduling.policies:
- ratelimit:
    events_per_second: 1000

With this sample configuration, the beats global bandwidth is limited to 1000eps. The first input's bandwidth is limited to 800eps. If the first input creates multiple workers/harvesters, the bandwidth of 800eps will be shared by all harvesters. Due to shared: false, each harvester created by input 2 will have a bandwidth limit of 100eps. Input 3 enforces a limit of 500eps per harvester. Due to input 2 and 3 joining the scheduling group A, the total shared bandwidth of all harvesters started by input 2 and 3 is limited to 800eps.

Tasks:

  • introduce event scheduling
  • support for defining shared scheduling groups
  • add support to filebeat inputs to configure local policies
  • add support to filebeat modules to configure local policies
  • add support to metricbeat/auditbeat modules/metricsets to configure local policies
  • add support to heartbeat to configure local policies
  • add support to packetbeat to configure local policies
  • support for passing additional settings from local policies to parent groups on Connect.

@urso urso added the in progress Pull request is currently in progress. label May 11, 2018
Connect(ctx Context) (Handler, error)
}

type Handler interface {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Handler should have comment or be unexported


type PolicyFactory func(cfg *common.Config) (Policy, error)

type Policy interface {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Policy should have comment or be unexported

"github.com/elastic/beats/libbeat/common"
)

type PolicyFactory func(cfg *common.Config) (Policy, error)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type PolicyFactory should have comment or be unexported

OnEvent(Event) (Event, error)
}

type Context interface {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Context should have comment or be unexported

Connect(ctx Context) (SchedulingHandler, error)
}

type SchedulingHandler interface {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type SchedulingHandler should have comment or be unexported

var SigDrop = &SigDropEvent{}
var SigClose = &SigCloseClient{}

func (sig *SigDropEvent) IsClose() bool { return false }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method SigDropEvent.IsClose should have comment or be unexported

)

var SigDrop = &SigDropEvent{}
var SigClose = &SigCloseClient{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var SigClose should have comment or be unexported

SigCloseClient struct{}
)

var SigDrop = &SigDropEvent{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var SigDrop should have comment or be unexported

type (
SigDropEvent struct{}

SigCloseClient struct{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type SigCloseClient should have comment or be unexported

package scheduling

type (
SigDropEvent struct{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type SigDropEvent should have comment or be unexported

global []Policy
}

func (s *Scheduling) Connect(group string, local []Policy) (*Client, error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Scheduling.Connect should have comment or be unexported

"fmt"
)

type Scheduling struct {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Scheduling should have comment or be unexported

mu sync.Mutex
}

var PolicyRegistry = &policyRegistry{
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var PolicyRegistry should have comment or be unexported

policies []Policy
}

func (s *Scheduling) Connect(group string, local []Policy) (*Client, error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Scheduling.Connect should have comment or be unexported

global []Policy
}

type Group struct {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Group should have comment or be unexported

@urso urso added the discuss Issue needs further discussion. label May 11, 2018
@ruflin
Copy link
Copy Markdown
Contributor

ruflin commented May 14, 2018

As the rate limiting is applied before the processors, this means if a processor would drop all events which contain a field DEBUG and this would be 99 out of 100 fields and the rate limit would be lower, it would be throttled even though the outcome would be just one event?

import "errors"

type config struct {
EventsPerSecond uint `config:"events_per_second" validate:"min=1"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the time unit configurable instead of hardcoding it "per second"? It would mean to split up this config into 2 parts, one for the duration and one for the number of events.

@ph
Copy link
Copy Markdown
Contributor

ph commented May 24, 2018

I've went through this PR quicky looks good, will keep an eye on it.

BTW <3 the picture in the PR description.

@urso
Copy link
Copy Markdown
Author

urso commented Nov 14, 2019

Closing. No progress for some time. If there is still some interest, please contact us by commenting on this PR.

@slmingol
Copy link
Copy Markdown

Curious where this ended up? See the PR was closed but was this feature enabled or is it still waiting on this PR?

@nebriv
Copy link
Copy Markdown

nebriv commented Jan 24, 2022

I'd be very interested in a sampling rate function as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

discuss Issue needs further discussion. in progress Pull request is currently in progress.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants