Feature: TCP Input#6266
Conversation
|
Waiting for CI to switch it to |
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
This is a legitimate question, I am not sure how the remaining part of the code will work if I keep the message as a byte array.
There was a problem hiding this comment.
I would ignore the overhead for now if it adds code complexity.
There was a problem hiding this comment.
For me on my side there is no overhead, it just a matter of calling scanner.Bytes() instead of scanner.Text(), I am just unaware of the downstream consequence for processor If I keep the bytes.
There was a problem hiding this comment.
Actually, I just read the doc, It's better to do an allocation because the original bytes can be overridden.
// Bytes returns the most recent token generated by a call to Scan.
// The underlying array may point to data that will be overwritten
// by a subsequent call to Scan. It does no allocation.
func (s *Scanner) Bytes() []byte {
return s.token
}
filebeat/input/tcp/conn.go
Outdated
There was a problem hiding this comment.
I've keep theses methods public, because I might move it to common.
There was a problem hiding this comment.
We should only move it to common if we also use it in metricbeat for example, which could happen.
|
Working on fixing the test suite, simple environment difference with how I assert the tests, I will make it more robust. thanks Travis. |
There was a problem hiding this comment.
Do we need this to be configurable? I would expect that same behaviour that supports also \r etc. like the log prospector / harvester.
There was a problem hiding this comment.
I believe it needs to be configurable; this input is more than just syslog entries.
If you look at the code by default, it will use the ScanLines function to deal with that case. Anything else will use our own implementation of the split.
The default method will detect \n, but it will also strip \r, I think this is the most common use case with plain sockets protocol. So I think we should be fine(tm)?
The custom delimiter is OK for more esoteric implementation. Maybe in a future revision, we could allow multiple patterns.
There was a problem hiding this comment.
The feature got requested already several times for the log harvester ...
There was a problem hiding this comment.
I haven't look at the log harvester how it does the splitting... maybe we can reuse it.
There was a problem hiding this comment.
I did a quick look at the log harvester, I believe we could use the same scanner/split.
filebeat/filebeat.reference.yml
Outdated
There was a problem hiding this comment.
+1 for that change, I've went with Tcp even if golang style would complain because of existing code, I wanted consistency with our Udp input. I guess I can make a PR on top of UDP to be all caps. :D
There was a problem hiding this comment.
Yep, saw the UDP one when I reviewed one of your other PR's and was thinking the same.
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
I would ignore the overhead for now if it adds code complexity.
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
I wonder if this defer should really be here or outside where the connection is opened or Handle is called.
There was a problem hiding this comment.
Make sense, I will that change when you are done with the review.
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
s/getRemoveHosts/getRemoteHosts
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
Should we report all hosts? Should we exclude some?
There was a problem hiding this comment.
I am not sure what is the best behavior here:
- taking the first one could be wrong.
- taking a subset could also be wrong.
- They are set in the metadata field for now, so users are free to use them or not.
There was a problem hiding this comment.
Any chance you could provide some example json output docs in the comment of the PR? I think that makes it easier to understand how the end event will look like.
There was a problem hiding this comment.
Also wonder if we want to opt-out from resolving the hostname.
There was a problem hiding this comment.
Right it could be costly or slow.
filebeat/input/tcp/conn.go
Outdated
There was a problem hiding this comment.
We should only move it to common if we also use it in metricbeat for example, which could happen.
filebeat/tests/system/test_tcp.py
Outdated
There was a problem hiding this comment.
Nice tests. I already head in mind to ask for this, especially seeing if the shut down works 👍
libbeat/common/scan.go
Outdated
There was a problem hiding this comment.
I would not put functions into common which we only use in one beat. We should have at least 2 use cases.
|
@ruflin You can go ahead with the full review, I was just marking it as in progress to wait for CI. Since I was testing some goroutine interaction I wanted to make it was solid on the CI. I haven't made your initial changes to this PR yet. But go a full review and I will make them in batch. Thanks for checking it out. |
|
jenkins test this please |
There was a problem hiding this comment.
The feature got requested already several times for the log harvester ...
There was a problem hiding this comment.
We have max_message_size in udp. We should probably use the same config name. Also we can change the udp one.
There was a problem hiding this comment.
I will rename max_read_buffer to max_message_size, after looking back at the implementation they are a bit different but the underlying concept is the same, making sure we don't OOM on incoming messages
filebeat/filebeat.reference.yml
Outdated
There was a problem hiding this comment.
Did you test to have multiple syslog inputs configured? I assume the following should work:
- type: tcp
host: "localhost:8080"
- type: tcp
host: "localhost:8081"
There was a problem hiding this comment.
I haven't tested it, but it should work if every input have their own port.
There was a problem hiding this comment.
If both port are free:
2018-02-07T11:47:40.997-0500 INFO tcp/harvester.go:58 Started listening for incoming TCP connection on: localhost:8080`
2018-02-07T11:47:40.997-0500 INFO input/input.go:87 Starting input of type: tcp; ID: 14733937925171989371
2018-02-07T11:47:40.997-0500 DEBUG [processors] processors/processor.go:49 Processors:
2018-02-07T11:47:40.997-0500 INFO tcp/input.go:50 Starting TCP input
2018-02-07T11:47:40.997-0500 INFO tcp/harvester.go:58 Started listening for incoming TCP connection on: localhost:8081
2018-02-07T11:47:40.998-0500 DEBUG [input] log/config.go:178 recursive glob enabled
if one port is already taken:
2018-02-07T11:48:52.928-0500 ERROR instance/beat.go:674 Exiting: Error in initing input: listen tcp 127.0.0.1:8080: bind: address already in use
Exiting: Error in initing input: listen tcp 127.0.0.1:8080: bind: address already in use
FB exits
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
Any chance you could provide some example json output docs in the comment of the PR? I think that makes it easier to understand how the end event will look like.
filebeat/input/tcp/config.go
Outdated
There was a problem hiding this comment.
@ruflin Not sure about that, I think it's has good as the default we have in UDP. :)
filebeat/input/tcp/config.go
Outdated
There was a problem hiding this comment.
I think you will have to define the unit here (time.Second)
filebeat/input/tcp/conn.go
Outdated
filebeat/input/tcp/conn.go
Outdated
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
I would expect that the prospector/input would be the server and then each harvester would be a client?
There was a problem hiding this comment.
Like we discussed in the input refactoring, I am not sure the wording harvester work in the context of UDP and TCP, I would support renaming harvester to server and use client for the actual remote connection. Would you be OK with that?
My reasoning is harvester is more active process to fetch new things (pull), which on the other hand a TCP / UDP is more passive (push).
There was a problem hiding this comment.
I'm not really concerned here about not using the name harvester but why input != server. From my point of view 1 input = 1 server, so if I start an input that should mean the server is started. Instead of having input, server, client I think we only need input=server, client and can remove one abstraction layer.
I'm not too concerned about not having the name harvester anywhere as this is not something exposed to the user.
There was a problem hiding this comment.
@ph Now looking at my own UDP code I see that I used also the harvester as the server :-( Leading by bad example? :-(
There was a problem hiding this comment.
Not, you lead by good example you see the problem :)
|
@ph I suggest we get this PR in rather sooner then later. The internal code structure I brought up above can be discussed and refactored later especially as I realised now the changes would also apply to UDP. Can you go through the other comments above again and see what needs addressing, rebase and squash and ping me for an other review and merge? |
|
@ruflin Thanks for answering my comments will do the changes, I wanted to bulk them. :) |
|
This is an example of the event format from a received events with this input. {
"@timestamp": "2018-02-12T19:12:29.000Z",
"@metadata": {
"beat": "filebeat",
"type": "doc",
"version": "7.0.0-alpha1",
"hostnames": [
"localhost"
],
"ip_address": "127.0.0.1:56576"
},
"beat": {
"hostname": "sashimi",
"version": "7.0.0-alpha1",
"name": "sashimi"
},
"message": "Hello world",
"input": {
"type": "tcp"
},
"prospector": {
"type": "tcp"
}
}
|
filebeat/input/tcp/conn.go
Outdated
There was a problem hiding this comment.
exported func NewDeadlineReader returns unexported type *tcp.deadlineReader, which can be annoying to use
filebeat/input/tcp/conn.go
Outdated
There was a problem hiding this comment.
exported func NewMeteredReader returns unexported type *tcp.meteredReader, which can be annoying to use
|
I am fixing the flaky tests, oh network services. |
|
Thanks for the json output. We should probably also add an example to the docs. I'm a bit on the fence about the |
|
@ruflin Let's do a following PR for the JSON example in the doc, not sure where it would be the best place to insert it, inputs doesn't dedicated pages yet, Any idea @dedemorton where I could place this information ? |
|
@ruflin Go ahead with the review, there is a failure in the Filebeat test with x-pack, I think the available snapshot isn't correct right now.. |
ruflin
left a comment
There was a problem hiding this comment.
LGTM. All the comments could also be addressed in a follow up PR. I would like to get this out rather soonish so we get in the hands of our users and get feedback for it.
@urso Are you ok with merging this or want to have a look first?
CHANGELOG.asciidoc
Outdated
There was a problem hiding this comment.
As we don't have separate pages yet for inputs I wonder if we should mark here that this feature is actually experimental.
@dedemorton Thoughts?
There was a problem hiding this comment.
Good for now but we now have really the need to split up config options per input (@dedemorton I remember you were thinking about this).
There was a problem hiding this comment.
@ruflin Looking at this is at the top of my list for next week.
There was a problem hiding this comment.
We should mention that this is for tcp only.
filebeat/input/tcp/client.go
Outdated
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
This should be logp.Debug("tcp", "Can not accept the connection: %s", err) I think
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
Seeing that you also close the connection here, I wonder what happens if Close is called twice?
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
Should each client have a unique id instead? Could make it interesting for stats later on.
There was a problem hiding this comment.
I think it still fine in that case, I don't think a unique ID like a UUID will give us more than using the ptr.
filebeat/input/tcp/input.go
Outdated
There was a problem hiding this comment.
Want to add the host info here?
filebeat/input/tcp/input.go
Outdated
filebeat/input/tcp/config.go
Outdated
There was a problem hiding this comment.
Why don't you write 5 * time.Minute instead of calculating it?
filebeat/input/tcp/conn.go
Outdated
There was a problem hiding this comment.
I am not a big fan of this name. It would be nice to express that it is a special LimitedReader.
However, I am not able to come up with a proper name. My best alternative proposal is ResetterLimitedReader. I am willing to let the change go.
There was a problem hiding this comment.
Naming is hard but, ResetterLimitedReader or ResetableLimitedReader. I am ok with both.
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
I think you could eliminate calling allClients and simply loop over the map:
for client, _ := range h.clients {
client.Close()
}
At this point there can't be any new connection, right? Does locking make sense here? If locking is required, I am ok with keeping the function. But the transformation seems unnecessary.
There was a problem hiding this comment.
When this code is run we stop refusing new connection but, when the client is shutting down each client are removed from the hash at the end of the goroutine.
So I have two options either I use a copy of the original hash which is what allClients is doing or I lock before looping the element in the array this will make all the goroutine wait for the mutex before they can remove themself. I went with the former to keep consistency in the values in the hash during the loop.
filebeat/input/tcp/harvester_test.go
Outdated
There was a problem hiding this comment.
+1 for extracting this function
There was a problem hiding this comment.
I will wait when we add a bit more feature to it.
filebeat/input/tcp/harvester_test.go
Outdated
There was a problem hiding this comment.
Nitpicking, I am completely fine if you ignore this comment:
I have seen that you used top-down design, I am not sure if it was on purpose, it made reviewing your code much easier. Do you mind reordering the functions here, so it would follow the same pattern?
CHANGELOG.asciidoc
Outdated
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
can we replace *harvester.Forwarder by an interface or function type? Some more isolation from harvester.Forwarder (I hope to remove Forwarder and Outlet in the future).
There was a problem hiding this comment.
Knowing that it make sense to change it.
There was a problem hiding this comment.
Similar to #6439 ?:)
I was planning to do it in second part but I can do it right now.
There was a problem hiding this comment.
let's keep the PR as small as possible and do the interface when we have a second implementation.
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
The metadata field behaves like @metadata in Logstash. If one sets index or id or pipeline, this will be configure some paramters in the Elasticsearch output. This on purpose?
When indexing into ES, the @metadata is dropped. That is, when sending directly to ES, this information can never be retrieved by users (no rename processor).
There was a problem hiding this comment.
I did it on purpose, I am not sure how valuable that information it is. but I could make it public field and store in the mapping. WDYT?
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
do we need a method on Client for this one?
There was a problem hiding this comment.
I don't see the need to make it a simple function or if It can be reused elsewhere, it will called once and we cache the result.
filebeat/input/tcp/client.go
Outdated
There was a problem hiding this comment.
cacheMetadata is quite short and only executed once by the constructor. How about moving the body into the constructor and removing the function.
Alternatively have it be a function returning a common.MapStr?
There was a problem hiding this comment.
I will go with the common.MapStr.
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
Let us not export the Harvester. It's some internal detail specific to the input. How about renaming to server?
Do we actually need a special Harvester/server type? The Input implementations seems to mostly wrap the server.
filebeat/input/tcp/harvester.go
Outdated
There was a problem hiding this comment.
better put defer h.wg.Done() at the top of the go-func. Also try to capture panics and print an error. If something goes really wrong with one client, the other clients can still be served.
Same for h.unregisterClient().
e.g.
go func() {
defer logp.Recover()
defer h.wg.Done()
defer conn.Close()
h.registerClient()
defer h.unregisterClient()
err := client.Handle()
...
}()
|
@ph Can you ping us when this needs an other look? |
Allow to receive new events via TCP, this will create a new event per line and add some useful information about the connected client to the evant. This input is marked as **experimental**. This input expose the following settings: - `line_delimiter`: The characters used to split incoming events, by default it will split on `\n` and will also strip both `\n` and `\r`. You can also use any characters like `;` and you can also used multiple characters delimiter like `<END>`, the delimiter tokens will always be removed from the string. - `max_message_size`: This is a number of bytes that a client can buffer in memory before finding a new message, if the limit is reached the connection is killed and the event is logged. This is to prevent rogue clients to DoS attack by consuming all the available memory. The default values is 20971520. - `timeout`: The server will close any client that reach this inactivity timeout. Ref: #5862
|
I have rebased this PR and included your latest comments. I also migrated the documentation into his own file. Open questions: Should we extract the TCP logic into his own class in this PR, like I did in refactor of #6439? What do we do about I would like to move this forward or at least have a plan to unblock #6433. |
|
@ruflin @urso Don't check it out yet, I will make some changes I've learned from @andrewkroh |
|
I think it make sense to reopen a new one, since structure has changes locally to follow the udp refactor. |
Allow to receive new events via TCP, this will create a new event per
line and add some useful information about the connected client to the
evant. This input is marked as experimental.
This input expose the following settings:
line_delimiter: The characters used to split incoming events, bydefault it will split on
\nand will also strip both\nand\r.You can also use any characters like
;and you can also used multiplecharacters delimiter like
<END>, the delimiter tokens will always beremoved from the string.
max_read_buffer: This is a number of bytes that a client can bufferin memory before finding a new message, if the limit is reached the
connection is killed and the event is logged. This is to prevent rogue
clients to DoS attack by consuming all the available memory. The default
values is 20971520.
timeout: The server will close any client that reach this inactivitytimeout.
TODO
I've an almost TLS support done, but this PR was getting pretty big and I still have the tests to write so I've excluded it.
Ref: #5862