Conversation
| Connect(ctx Context) (Handler, error) | ||
| } | ||
|
|
||
| type Handler interface { |
There was a problem hiding this comment.
exported type Handler should have comment or be unexported
|
|
||
| type PolicyFactory func(cfg *common.Config) (Policy, error) | ||
|
|
||
| type Policy interface { |
There was a problem hiding this comment.
exported type Policy should have comment or be unexported
| "github.com/elastic/beats/libbeat/common" | ||
| ) | ||
|
|
||
| type PolicyFactory func(cfg *common.Config) (Policy, error) |
There was a problem hiding this comment.
exported type PolicyFactory should have comment or be unexported
| OnEvent(Event) (Event, error) | ||
| } | ||
|
|
||
| type Context interface { |
There was a problem hiding this comment.
exported type Context should have comment or be unexported
| Connect(ctx Context) (SchedulingHandler, error) | ||
| } | ||
|
|
||
| type SchedulingHandler interface { |
There was a problem hiding this comment.
exported type SchedulingHandler should have comment or be unexported
| var SigDrop = &SigDropEvent{} | ||
| var SigClose = &SigCloseClient{} | ||
|
|
||
| func (sig *SigDropEvent) IsClose() bool { return false } |
There was a problem hiding this comment.
exported method SigDropEvent.IsClose should have comment or be unexported
| ) | ||
|
|
||
| var SigDrop = &SigDropEvent{} | ||
| var SigClose = &SigCloseClient{} |
There was a problem hiding this comment.
exported var SigClose should have comment or be unexported
| SigCloseClient struct{} | ||
| ) | ||
|
|
||
| var SigDrop = &SigDropEvent{} |
There was a problem hiding this comment.
exported var SigDrop should have comment or be unexported
| type ( | ||
| SigDropEvent struct{} | ||
|
|
||
| SigCloseClient struct{} |
There was a problem hiding this comment.
exported type SigCloseClient should have comment or be unexported
| package scheduling | ||
|
|
||
| type ( | ||
| SigDropEvent struct{} |
There was a problem hiding this comment.
exported type SigDropEvent should have comment or be unexported
| global []Policy | ||
| } | ||
|
|
||
| func (s *Scheduling) Connect(group string, local []Policy) (*Client, error) { |
There was a problem hiding this comment.
exported method Scheduling.Connect should have comment or be unexported
| "fmt" | ||
| ) | ||
|
|
||
| type Scheduling struct { |
There was a problem hiding this comment.
exported type Scheduling should have comment or be unexported
| mu sync.Mutex | ||
| } | ||
|
|
||
| var PolicyRegistry = &policyRegistry{ |
There was a problem hiding this comment.
exported var PolicyRegistry should have comment or be unexported
| policies []Policy | ||
| } | ||
|
|
||
| func (s *Scheduling) Connect(group string, local []Policy) (*Client, error) { |
There was a problem hiding this comment.
exported method Scheduling.Connect should have comment or be unexported
| global []Policy | ||
| } | ||
|
|
||
| type Group struct { |
There was a problem hiding this comment.
exported type Group should have comment or be unexported
|
As the rate limiting is applied before the processors, this means if a processor would drop all events which contain a field |
| import "errors" | ||
|
|
||
| type config struct { | ||
| EventsPerSecond uint `config:"events_per_second" validate:"min=1"` |
There was a problem hiding this comment.
Should we make the time unit configurable instead of hardcoding it "per second"? It would mean to split up this config into 2 parts, one for the duration and one for the number of events.
|
I've went through this PR quicky looks good, will keep an eye on it. BTW <3 the picture in the PR description. |
|
Closing. No progress for some time. If there is still some interest, please contact us by commenting on this PR. |
|
Curious where this ended up? See the PR was closed but was this feature enabled or is it still waiting on this PR? |
|
I'd be very interested in a sampling rate function as well. |
The changes in this PR add support for configurable event scheduling in libbeat. Event scheduling will be applied when a beat publishes an event. Users configure policies like rate limiting and others, which will be applied to event publishing, before the event processors are executed.
Policies can be local or have shared state. Thanks to shared state, policies like rate limiting can affect a group of workers/publishers.
A policy implementation is free to create back-pressure by blocking event publishing or drop/modify events.
Besides introducing interfaces and configuration support, a simple
ratelimitpolicy is provided in this PR. The current implementation is only for demonstration purposes (sample code) and not recommended for production. The rate limiter supports full local rate limiting (perbeat.Clientconnection), but also a shared mode. With shared mode, the total bandwidth enforced by the rate limiter will be shared between allbeat.Clientinstances.Scheduling can be defined locally per input/module instance, per
groupand even globally, per beat. A scheduling group can also have a parent group. The policies are concatenated in this order: local, group, [parent group], global.Scheduling 'groups' are used to configured shared resources between multiple workers/data sources.
Sample config:
With this sample configuration, the beats global bandwidth is limited to 1000eps. The first input's bandwidth is limited to 800eps. If the first input creates multiple workers/harvesters, the bandwidth of 800eps will be shared by all harvesters. Due to
shared: false, each harvester created by input 2 will have a bandwidth limit of 100eps. Input 3 enforces a limit of 500eps per harvester. Due to input 2 and 3 joining the scheduling groupA, the total shared bandwidth of all harvesters started by input 2 and 3 is limited to 800eps.Tasks:
Connect.