Move filebeat to new publisher pipeline#4644
Conversation
cbb9b68 to
03c57bf
Compare
filebeat/harvester/forwarder.go
Outdated
There was a problem hiding this comment.
Note: the forwarder is going to be removed in future iterations.
ruflin
left a comment
There was a problem hiding this comment.
LGTM. I feels strange to get rid of the spooler but it's a great step forward to have this now in the publisher, means other beats can make use of it too. Small step for beats, big step for filebeat :-)
During review I left a few comments/questions that popped up in my head. Most of them got answered when reading more code but it would be good if you could have a quick look at it and just confirm that this is the case.
filebeat/beater/filebeat.go
Outdated
There was a problem hiding this comment.
Why is this a defer inside the defer?
There was a problem hiding this comment.
ups, copy'n paste. Interestingly the effect will be the same :)
filebeat/harvester/forwarder.go
Outdated
There was a problem hiding this comment.
Trying to understand why type is still here and all the others were moved.
There was a problem hiding this comment.
the forwarder is subject to be removed in future iterations. It's for the PutValue when publishing. In the meantime I added ClientConfig.Fields to register fields to be added to every event on publish.
Basically the Forwarder will be remove in favor of the channel.Outleter, which will be removed in the future, in favor if beat.Client (once we get a more sane registry handling).
filebeat/harvester/forwarder.go
Outdated
There was a problem hiding this comment.
Could we also add the type in the outlet? Probably we need the type in other places too?
filebeat/harvester/forwarder.go
Outdated
There was a problem hiding this comment.
For my own note: I must check that there are still 2 different methods here, one blocking and one non blocking to make sure harvester is only closed when data acked (or in queue ...)
There was a problem hiding this comment.
the harvester get's a publishState, which uses the stateOutlet. The stateOutlet (owned by the prospector) and the Outlet of the harvesters are closed on different signals, each... so many hoops...
filebeat/prospector/log/harvester.go
Outdated
There was a problem hiding this comment.
Comment on line 113 is now in the wrong place. I put this one as early as possible because of our still unsovled race condition.
There was a problem hiding this comment.
The problem with setting here 200 instead of one is that the likelyhood of all log lines being already read increases. I want to try to shut down as early as possible. What is the reason you increased this one?
There was a problem hiding this comment.
before it was checking for 'Flushing spooler' message. which can be any number of events (up to spooler size). As spooler and any kind of flushing is gone, I have had to use a replacement on number of events being published. The test file contains like 50k events. 200 should be ok, by we can decrease it.
There was a problem hiding this comment.
Can you elaborate on this one?
There was a problem hiding this comment.
see outlet.OnEvent in channel/outlet.go.
// Note: race condition on shutdown:
// The underlying beat.Client is asynchronous. Without proper ACK
// handler we can not tell if the event made it 'through' or the client
// close has been completed before sending. In either case,
// we report 'false' here, indicating the event eventually being dropped.
// Returning false here, prevents the harvester from updating the state
// to the most recently published events. Therefore, on shutdown the harvester
// might report an old/outdated state update to the registry, overwriting the
// most recently published offset in the registry on shutdown.
libbeat/common/mapstr.go
Outdated
There was a problem hiding this comment.
Kind of surprised to see a mapstr change in this PR. Why was that needed?
There was a problem hiding this comment.
ups, kind of leaked into this PR :)
I found the old processors ordering potentially overwriting a globally shared MapStr and was forced to introduce some Clone() on shared MapStr instances. So the processors adding/removing fields always operate on copies, but not the original MapStr. The amount of fmt.Errorf was so expensive it did totally kill throughput.
There was a problem hiding this comment.
on purpose commented out? also below
There was a problem hiding this comment.
yeah, loads of debugs commented out in the fast path. I did still keep them, so I can re-enabled them If I really need them.
- remove filebeat/spooler and filebeat/publisher package -> all spooling and
reporting published events is moved to the publisher pipeline.
Difference between spooler/publisher to new pipeline is:
The new publisher pipeline operates fully asynchronous
- have filebeat register an eventACKer with the publisher pipeline.
The eventACKer will forward state updates of ACKed events to the registrar
- filebeat uses beat.Event for events
- update util.Data to use beat.Event:
- store state in event.Private field for consumption by registry
- changes to filebeat/channels package:
- introduce OutletFactory: connect to publisher pipeline, applying common
prospector settings
- remove Outleter.SetSignal and Outleter.OnEventSignal
- add Outleter.Close
- introduce SubOutlet (separate closing):
- when a suboutlet is closed, the original outlet is still active
- if underlying outlet is closed, the suboutlet becomes closed as well (can
not forward anymore)
- introduce CloseOnSignal: close Outlet once a 'done' channel is closed
- most functionality from harvester.Forwarder is moved into the
outlet/publisher pipeline client
- fix: ensure client events listener properly installed
Note:
Outlet shutdown with prospectors and harvesters is somewhat delicate. There are
3 shutdown signals to take into account:
- filebeat.done
- harvester.done
- outDone (signal used to unblock prospectors from registrar on shutdown).
An outlet is shared between all harvesters of a prospector and the prospector
itself. If outDone is closed, all outlets will be closed, unblocking
potentially waiting harvesters and prosepectors on filebeat shutdown.
The prospector uses a sub-outlet for sending state updates (being closed on
filebeat.done). The harvesters sub-outlet is closed when harveser.done is
closed.
The signals are only required to unblock an harvester/prospector on exit. On
normal shutdown, the outlets are closed after all workers have been finished.
reporting published events is moved to the publisher pipeline.
Difference between spooler/publisher to new pipeline is:
The new publisher pipeline operates fully asynchronous
The eventACKer will forward state updates of ACKed events to the registrar
prospector settings
not forward anymore)
outlet/publisher pipeline client
Note:
Outlet shutdown with prospectors and harvesters is somewhat delicate. There are
3 shutdown signals to take into account:
An outlet is shared between all harvesters of a prospector and the prospector
itself. If outDone is closed, all outlets will be closed, unblocking
potentially waiting harvesters and prosepectors on filebeat shutdown.
The prospector uses a sub-outlet for sending state updates (being closed on
filebeat.done). The harvesters sub-outlet is closed when harveser.done is
closed.
The signals are only required to unblock an harvester/prospector on exit. On
normal shutdown, the outlets are closed after all workers have been finished.