Add unmanaged mode#103
Conversation
So the shipper can be run using a local config file instead of the control protocol.
fearful-symmetry
left a comment
There was a problem hiding this comment.
Left a few basic comments, since a lot of the shipper functionality has changed since I last looked at it.
Depending on the scope of this PR, I kind of wonder if it would make more sense to refactor some of the "managed" initialization/shutdown functions so we can import them across both managed and unmanaged code, so we're not duplicating the functionality as much.
| if err != nil { | ||
| return fmt.Errorf("error reading control config from agent: %w", err) | ||
| cfg, err := config.ReadConfigFromFile() | ||
| switch { |
There was a problem hiding this comment.
Wonder if there's a more idiomatic way to do this? Beats have a special flag (management.enabled), and I wonder if that's a better fit than trying and reverting to unmanaged on error.
There was a problem hiding this comment.
But in case you need to test several configurations you also need a configuration filename. I figured it would be easier to just base the managed mode on a single flag.
controller/runner.go
Outdated
| // Testing that the server is running and only then unlock the mutex. | ||
| // Otherwise if `Close` is called at the same time as `Start` it causes race condition. | ||
| go func() { | ||
| defer r.mu.Unlock() |
There was a problem hiding this comment.
Is that the only thing this mutex is used for? If so, you might want to rename it, I was a tad confused by the mutex itself.
Alternatively, depending on the where the race is in Close() you could also use a WaitGroup, or just an atomic flag that can tell the Close() what shutdown components to avoid
There was a problem hiding this comment.
The reason this mutex exists is that the controller tests call Close before Start finishes. Which is theoretically possible with the control protocol, I guess. I'll rename it to startMutex.
controller/runner.go
Outdated
| defer r.mu.Unlock() | ||
| // initialization can fail on each step, so it's possible that the runner | ||
| // is partially initialized and we have to account for that. | ||
| r.once.Do(func() { |
There was a problem hiding this comment.
Why does this need to be in a once block?
There was a problem hiding this comment.
You're right, after I added nil checks there is no need anymore, I'll remove it.
There was a problem hiding this comment.
After a second thought, I think we need it in case there will be several concurrent calls of Close by the control protocol. We don't want a race condition with nil checks, I remember it now.
| } | ||
| done := make(doneChan) | ||
| go func() { | ||
| err = runner.Start() |
There was a problem hiding this comment.
Looks like we return this err below, any possibility we get an odd situation where a signal might happen before Start finishes?
There was a problem hiding this comment.
Unless I missed something, this should be okay because of the startMutex that would not allow to execute Close until Start finished.
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
Can we update the README to indicate how to run this: https://github.com/elastic/elastic-agent-shipper/blob/main/README.md? |
What does this PR do?
Add unmanaged/standalone mode.
Why is it important?
So the shipper can be run using a local config file instead of the
control protocol.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration files- [ ] I have added an entry inCHANGELOG.mdorCHANGELOG-developer.md.How to test this PR locally
I ran the latest filebeat (73999a009ec97eca7451760730abe7366d8d8156) with the following config:
./filebeat run -d -v -c ~/filebeat_shipper.ymlfilebeat_shipper.yml
input.log
And I ran shipper with the following config:
elastic-agent-shipper.yml
I got the following logs from filebeat:
and the following logs from the shipper:
These messages indicate that filebeat successfully published a batch of events:
filebeat
{ "log.level": "info", "@timestamp": "2022-09-06T15:14:25.422+0200", "log.logger": "monitoring", "log.origin": { "file.name": "log/log.go", "file.line": 186 }, "message": "Non-zero metrics in the last 30s", "service.name": "filebeat", "monitoring": { "metrics": { "beat": { "cpu": { "system": { "ticks": 0 }, "total": { "ticks": 1, "time": { "ms": 1 }, "value": 1 }, "user": { "ticks": 1, "time": { "ms": 1 } } }, "info": { "ephemeral_id": "1dacd9f3-4375-4b82-ac46-4f4e3be67fd3", "name": "filebeat", "uptime": { "ms": 30016 }, "version": "8.5.0" }, "memstats": { "gc_next": 8047120, "memory_alloc": 5075672, "memory_sys": 26084360, "memory_total": 27079680, "rss": 46514176 }, "runtime": { "goroutines": 25 } }, "filebeat": { "events": { "added": 8, "done": 8 }, "harvester": { "open_files": 1, "running": 1, "started": 1 } }, "libbeat": { "config": { "module": { "running": 0 } }, "output": { "events": { "acked": 5, "active": 0, "batches": 1, "total": 5 }, "type": "shipper" }, "pipeline": { "clients": 1, "events": { "active": 0, "filtered": 3, "published": 5, "retry": 5, "total": 8 }, "queue": { "acked": 5, "max_events": 4096 } } }, "registrar": { "states": { "cleanup": 1, "current": 3, "update": 8 }, "writes": { "success": 2, "total": 2 } }, "system": { "cpu": { "cores": 10 }, "load": { "1": 1.7754, "15": 2.895, "5": 2.5586, "norm": { "1": 0.1775, "15": 0.2895, "5": 0.2559 } } } }, "ecs.version": "1.6.0" } }shipper
{ "log.level": "debug", "@timestamp": "2022-09-06T15:13:56.468+0200", "log.logger": "shipper-server", "log.origin": { "file.name": "server/server.go", "file.line": 142 }, "message": "finished publishing a batch. Events = 5, accepted = 5, accepted index = 4", "ecs.version": "1.6.0" }and
Then I stopped the shipper sending the INTSIG and observed that everything shut down correctly.
Related issues