add socket listener & writer#2094
add socket listener & writer#2094sparrc merged 1 commit intoinfluxdata:masterfrom phemmer:generic-socket
Conversation
|
|
||
| // Wait waits for a metric to be added to the accumulator. | ||
| // Accumulator must already be locked. | ||
| func (a *Accumulator) Wait() { |
There was a problem hiding this comment.
This was added so that tests don't have to call sleep() with some excessive value to avoid race conditions.
| func (psl *packetSocketListener) listen() { | ||
| buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet | ||
| for { | ||
| _, _, err := psl.ReadFrom(buf) |
There was a problem hiding this comment.
these lines should only parse what was most recently read into the buffer, like this:
+ n, _, err := psl.ReadFrom(buf)
+ if err != nil {
+ psl.AddError(err)
+ break
+ }
+
+ metrics, err := psl.Parse(buf[0:n])
There was a problem hiding this comment.
Oops :-)
Dunno how I missed that one. Thanks for the catch.
|
|
||
| // Wait waits for a metric to be added to the accumulator. | ||
| // Accumulator must already be locked. | ||
| func (a *Accumulator) Wait() { |
| } | ||
|
|
||
| for _, m := range metrics { | ||
| strs, err := sw.Serialize(m) |
There was a problem hiding this comment.
I've changed the behavior of the Serialize functions, they now return a byte slice which includes all newlines at the end of each metric. This part of the code will need to be refactored for that, but give me a day or two because I also have a change that will create an io.Reader from a slice of metrics. Using the io.Reader will create fewer allocations and also allow using io.CopyBuffer() into sw.Conn. Using io.CopyBuffer provides for a configurable max buffer size which users of datagram protocols will likely want.
There was a problem hiding this comment.
Using io.CopyBuffer provides for a configurable max buffer size which users of datagram protocols will likely want.
We would need to slap a warning on the value that setting it below 64kb (at least when using UDP) is dangerous. If the read buffer size is set less than the size of an incoming packet, the packet will be lost. The only use case I can think of for adjusting from a default 64kb would be unix domain sockets.
There was a problem hiding this comment.
The max buffer size I'm talking about here is for the writer, so that it would chop up the result of Serialize() into chunks rather than sending it all as a single byte buffer (which could easily exceed 64kb with thousands of metrics).
UDP users will also probably want their packets closer to the 512 - 4096 byte range
There was a problem hiding this comment.
Doh. I need to stop reviewing code within an hour of waking up.
There was a problem hiding this comment.
I also have a change that will create an io.Reader from a slice of metrics
Did this get implemented? I went looking for such a thing but couldn't find anything.
| } | ||
|
|
||
| for _, m := range metrics { | ||
| strs, err := sw.Serialize(m) |
There was a problem hiding this comment.
Using io.CopyBuffer provides for a configurable max buffer size which users of datagram protocols will likely want.
We would need to slap a warning on the value that setting it below 64kb (at least when using UDP) is dangerous. If the read buffer size is set less than the size of an incoming packet, the packet will be lost. The only use case I can think of for adjusting from a default 64kb would be unix domain sockets.
| func (psl *packetSocketListener) listen() { | ||
| buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet | ||
| for { | ||
| _, _, err := psl.ReadFrom(buf) |
There was a problem hiding this comment.
Oops :-)
Dunno how I missed that one. Thanks for the catch.
testutil/accumulator.go
Outdated
| // Accumulator defines a mocked out accumulator | ||
| type Accumulator struct { | ||
| sync.Mutex | ||
| Cond *sync.Cond |
There was a problem hiding this comment.
note to self: cleanup: explicit Cond name not necessary.
| scnr := bufio.NewScanner(c) | ||
| for scnr.Scan() { | ||
| bs := scnr.Bytes() | ||
| bs = append(bs, '\n') |
There was a problem hiding this comment.
This is a workaround for #2297
I would like to remove this as growing the slice can result in more allocations. Potentially big ones as these slices will be large.
| bs := buf[:n] | ||
| if len(bs) > 0 && bs[len(bs)-1] != '\n' { | ||
| bs = append(bs, '\n') | ||
| } |
|
|
PR updated to remove workarounds for #2297 |
|
Current build failure looks unrelated to changes in this PR |
|
rebased for merge conflict |
closes influxdata#1516 closes influxdata#1711 closes influxdata#1721 closes influxdata#1526
Required for all PRs:
This adds support for a generic socket writer & listener.
The original intent was to add support for unix domain sockets. But it was trivial to write generic plugins that can handle all protocols. Thus the functionality of the
socket_listenerduplicatestcp_listenerandudp_listener.However in the case of
tcp_listener, there is a critical difference in that the plugin will not ever drop a metric (such as if the buffer fills up). TCP is meant to be a reliable protocol, thus it should be up to the sender whether data gets dropped.Another slight difference in the
socket_listeneris that instead of having 2 layers of buffering, an application chan buffer and a socket buffer, it only has a socket buffer. Config parameters have been provided for adjusting the size of the socket buffer. The chan buffer could be added, but I couldn't see any benefit to doing so, and thought it might be less confusing having only 1 layer of buffering.Closes #1516, #1711, #1721
Obsoletes #1526