Extracting the UDP Server logic from the UDP input#6439
Extracting the UDP Server logic from the UDP input#6439ruflin merged 8 commits intoelastic:masterfrom ph:refactor/make-udp-composable-with-other-input
Conversation
There was a problem hiding this comment.
don't use an underscore in package name
|
@urso I believe this is what we were intending to do for separating the Source and the Input, this is a shot at the UDP input, if that is correct that will unblock me for the syslog Input and will also apply the same strategy for the TCP input in another PR. I am looking for naming of this package, We were thinking about: InputReader, InputStream, InputSource, I don't have a preference :) Also with the extract of the filewatcher/udp and the tcp code, I don't think we need a common interface. |
filebeat/inputsource/input_source.go
Outdated
There was a problem hiding this comment.
don't use an underscore in package name
There was a problem hiding this comment.
We only had python test..
There was a problem hiding this comment.
It's really great to have these tests in.
filebeat/input/udp/config.go
Outdated
There was a problem hiding this comment.
Not sure if your editor adds udp here?
|
I did a GoRename on a package, but it didn't update the import..
…On Thu, Feb 22, 2018 at 8:38 AM, Nicolas Ruflin ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In filebeat/input/udp/config.go
<#6439 (comment)>:
> @@ -2,19 +2,21 @@ package udp
import (
"github.com/elastic/beats/filebeat/harvester"
+ udp "github.com/elastic/beats/filebeat/inputsource/udp"
Not sure if your editor adds udp here?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#6439 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAACgEL-kcslWvkMXcov1MkS4168TyAIks5tXW3vgaJpZM4SOUgd>
.
--
--
ph
Software Engineer
|
|
@ruflin updated. |
|
Looks reasonably simple. +1 on adding short-lived go based tests. Not sure if the simple callback interface is good enough for all TCP though. How about passing some 'context' variable to the callback? This could be used to resolve some client metadata for example. Or close connection on parse/message errors -> TCP close client connection, UDP refuse processing messages from client X for some time? |
I think it's still premature to add them, I think the TCP extraction will definitely need it. I would merge it as is and let the requirements evolve that interface. |
|
Seems to have a hang on the suite, will check it out. |
ruflin
left a comment
There was a problem hiding this comment.
Changes LGTM. Naming can still be solved later.
I think windows struggles with the udp server tests. Perhaps skip them on windows?
filebeat/inputsource/inputsource.go
Outdated
There was a problem hiding this comment.
Not a fan of the naming here but as it's not user facing we can figure out the "correct" name when we have multiple use cases. This will make it easier.
There was a problem hiding this comment.
More I think about it.. using source instead of inputsource would make sense. @ruflin @andrewkroh
There was a problem hiding this comment.
If we call this one source, what should we call this one then? https://github.com/elastic/beats/blob/master/filebeat/harvester/source.go
I suggest to get the PR with this name in an iterate on top of it. There will be quite a few more renames I assume.
There was a problem hiding this comment.
It's really great to have these tests in.
|
I will check windows they should be solid.
…--
ph
|
|
@ph Seems like Windows still has some issues. |
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
Are there any bounds on these values? If so you should add a validate tags like validate:"min:0 max:65535" or validate:"required" (double check the exact ucfg syntax)
There was a problem hiding this comment.
Again, there was bounds before, but for now we can do the following: required and > 0, enforcing a top level bound might not be a good idea in a 6.x release.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
I think the callback should include the source address (ip/port). This will allow for source metadata to be added to the event. Or in the future it could allow for per source buffering.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
I recommend having Start() if you have a Stop() to keep the interface balanced and remove the responsibility of managing a goroutine from the user.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
You could create a Logger instance for the server to use.
log: logp.NewLogger("udp").With("address", config.Host),
Then anywhere within this object that you do logging you would use s.log.Xyz(). This gives you a named logger with context plus gives you the ability to use the structured logging methods if needed.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
I think the deadline only needs to be set once after initialization.
There was a problem hiding this comment.
Sadly this is not how it works, SetDeadline is an absolute time. But it does make sense, let say that you are waiting for a large computation you could give him a bigger deadline than a simple req/response.
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to ReadFrom or
// WriteTo. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
There was a problem hiding this comment.
I was incorrectly thinking this was a read timeout with a time.Duration.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
I'd prefer for Stop to block until the goroutine exits. This should Close the socket and wait for run to exit (via sync.WaitGroup).
Signaling to stop via a flag means that the socket won't actually be closed until the a packet is received to unblock the recvfrom or a timeout is reached (max 5 minutes).
There was a problem hiding this comment.
Correct, you are right we need to get out of the loop and wait until the processing is done:
- Close the Socket.
- Set false, so we are not running (mostly used in tests)
- Block until goroutine exits.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
This logic looks incorrect. I think it should be if !ok || !e.Timeout(), but personally I would probably special case the timeout and continue for that.
if ok && e.Timeout() {
continue
}
u.log.Errorw("Error receiving from socket", "error", err)
There was a problem hiding this comment.
Yes its more clear that way.
filebeat/input/udp/config.go
Outdated
There was a problem hiding this comment.
Perhaps there should not be a default and make the user explicitly state on which port to listen. The logstash udp input does this.
There was a problem hiding this comment.
I agree with that, but can only do in for 7.0 since this could break existing configuration.
There was a problem hiding this comment.
Note, the TODO was there before, my changes. :)
There was a problem hiding this comment.
+1 on the proposal from Andrew. Sorry about the TODO ...
There was a problem hiding this comment.
Is the udp input feature listed as experimental or beta? This could give us some leeway to make the change now.
There was a problem hiding this comment.
Lets do it in another PR, with a clear intend in the changelog.
I will let it run, but I might disable them for this platform. |
|
+1 for 7.0 I’ll do the same for TCP
On Tue, Mar 27, 2018 at 7:27 AM Nicolas Ruflin ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In filebeat/input/udp/config.go
<#6439 (comment)>:
> )
var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
- MaxMessageSize: 10240,
- // TODO: What should be default port?
- Host: "localhost:8080",
+ Config: udp.Config{
+ MaxMessageSize: 10240,
+ // TODO: What should be default port?
+ Host: "localhost:8080",
+1 on the proposal from Andrew. Sorry about the TODO ...
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#6439 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAACgPQmL3ekJojfePQUU38jr1yeIEeQks5tiiIegaJpZM4SOUgd>
.
--
ph
|
|
I've updated this PR with comments from @ruflin and @andrewkroh, I've skipped the udp test on windows as suggested by @ruflin. Also there seems to be a flaky test with metricbeats that is not related to this PRs. |
|
Oh right!
On Tue, Mar 27, 2018 at 4:39 PM Andrew Kroh ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In filebeat/input/udp/config.go
<#6439 (comment)>:
> )
var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
- MaxMessageSize: 10240,
- // TODO: What should be default port?
- Host: "localhost:8080",
+ Config: udp.Config{
+ MaxMessageSize: 10240,
+ // TODO: What should be default port?
+ Host: "localhost:8080",
Is the udp input feature listed as experimental or beta? This could give
us some leeway to make the change now.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#6439 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAACgP7OBDDr_xKIXLjsI7XEdv2yyBswks5tiqN0gaJpZM4SOUgd>
.
--
ph
|
filebeat/input/udp/input.go
Outdated
There was a problem hiding this comment.
Double colon in error message.
filebeat/inputsource/udp/server.go
Outdated
There was a problem hiding this comment.
Typically a start method does not block like this does. It will "start" some kind of worker goroutine. This encapsulates the logic required to manage a goroutine which is good so that every user is not re-implementing the same thing. As a barebones example:
func (x *Thing) Start() {
x.wg.Add(1)
go func() {
defer r.wg.Done()
x.run()
}()
}
func (x *Thing) Stop() {
close(x.done)
x.wg.Wait()
}
func (x *Thing) run() {
for {
select {
case <-r.done:
return
default:
}
// Do a unit work.
}
}
In your case the socket would replace the done channel. And Start would initialize listening socket before starting the worker goroutine. This way Start can return an error if it can't "listen" and when start returns the Server is guaranteed to be in the "listening" state (which might remove the need to have IsRunning).
There was a problem hiding this comment.
Thanks for the long input, really appreciated and it clarify things.
There was a problem hiding this comment.
I recommend listening only on localhost to avoid opening the port up to the world. Secondly, since this is a test that shouldn't fail if 10000 is already in use you can use localhost:0 and it will bind to an ephemeral port. You'll need to expose the local address from the server so you know what port that it is bound to.
If the port in the address parameter is empty or "0", as in "127.0.0.1:" or "[::1]:0", a port number is automatically chosen. The LocalAddr method of PacketConn can be used to discover the chosen port.
There was a problem hiding this comment.
What was the failure that led to this? I'm not a fan of doing this because it could be masking issues with the code or the test.
This commit extract the UDP Server logic outside of the UDP input, this will to reuse this component and the configuration for the syslog input. It now uses a callback instead of a forwarder instance. Ref: #6361
|
There was some subtility in our UDP implementation for some time and there is some intrisinc difference between windows and Linux. If you do If you do a
@andrewkroh conserving the buffer send by windows makes the code a bit ugly I don't think I have other choice dans matching the original messages. we are a bit on our own.. Right now I do track the Windows Reference: |
| } else { | ||
| assert.NotNil(t, info.mt.Source) | ||
| assert.False(t, info.mt.Truncated) | ||
| } |
There was a problem hiding this comment.
because of the platform differences we have to deal that here. I've decided to just use predicates to route the correct logic. If the tests become more complex it will make sense to use tags to target them.
|
blocked by discussion on the field of the event in #6700 (comment) |
filebeat/input/udp/input.go
Outdated
| logp.Err("Error running harvester:: %v", err) | ||
| } | ||
| }() | ||
| if !p.started.Load() { |
There was a problem hiding this comment.
I think we talked about this and you were going to open an issue for it. If you did/do can you add a link back to here.
There's a race condition problem with using an atomic.Bool to protect the underlying state of the server. A mutex needs to be used instead to protect it.
There was a problem hiding this comment.
Correct, let me switch that to a mutex.
There was a problem hiding this comment.
I will do the same change in the TCP.
There was a problem hiding this comment.
I've moved it to a mutex instead and will do a followup issue, I need to get a bit more insight about was is managing the input before proposing something. I did the same changes in the TCP.
This commit extract the UDP Server logic outside of the UDP input, this
will to reuse this component and the configuration for the syslog input.
It now uses a callback instead of a forwarder instance.
Ref: #6361