Conversation
|
I'll have a fix for the time related test failure. |
ssoroka
left a comment
There was a problem hiding this comment.
Looks amazing! Very thorough.
Please remove the .DS_Store files for merging.
I have a couple small changes in feedback.
| } | ||
|
|
||
| func (h *InfluxDBV2Listener) Description() string { | ||
| return "Accept metrics over InfluxDB 1.x HTTP API" |
| ReadTimeout internal.Duration `toml:"read_timeout"` | ||
| WriteTimeout internal.Duration `toml:"write_timeout"` | ||
| MaxBodySize internal.Size `toml:"max_body_size"` | ||
| MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored |
There was a problem hiding this comment.
I'd remove this. if it's already deprecated, there's no reason to add it to a new plugin.
There was a problem hiding this comment.
Perfect, missed this, will remove.
|
|
||
| ReadTimeout internal.Duration `toml:"read_timeout"` | ||
| WriteTimeout internal.Duration `toml:"write_timeout"` | ||
| MaxBodySize internal.Size `toml:"max_body_size"` |
There was a problem hiding this comment.
Not sure we really need a max body size. If we do, 32mb is possibly too small. We should be streaming the request through, so this won't affect memory used.
There was a problem hiding this comment.
ah, yeah - good catch, InfluxDB 2.x now no longer has a max-body-size config parameter - https://v2.docs.influxdata.com/v2.0/reference/config-options/
There was a problem hiding this comment.
I would keep this option, 32mb seems like plenty. The typical batch size is 1000-5000 metrics and is gzip compressed.
There was a problem hiding this comment.
The limit seems pretty arbitrary if we're taking results of any size in a stream.
There was a problem hiding this comment.
This API from InfluxDBv2 is built around accepting batches of data, it's not a streaming API. As everything is loaded into memory, I think this is is a useful limit to avoid OOM.
ssoroka
left a comment
There was a problem hiding this comment.
ok so is it magi chair, magic chair, or magic hair? :D Either way, bravo, fellow mage. 🧙
magic hair, an ol' high school nickname that became my de facto online alias. Thanks for the review and approval! |
danielnelson
left a comment
There was a problem hiding this comment.
I don't have a very strong opinion on the /ready endpoint, but if none of the client libraries are using it then I suggest we remove it here just to keep things neat.
Don't forget to remove .DS_Store.
| } | ||
|
|
||
| func (h *InfluxDBV2Listener) Description() string { | ||
| return "Accept metrics over InfluxDB 1.8+ / 2.x HTTP API" |
There was a problem hiding this comment.
Let's document, and support, 2.x only.
| tags := map[string]string{ | ||
| "address": h.ServiceAddress, | ||
| } |
There was a problem hiding this comment.
We should make this optional, or just remove it, so that it is possible to pass though data unmodified. If we want to make it optional it should be done similar to the bucket_tag option.
There was a problem hiding this comment.
This tags is just used to add tagging to the internal selfstat below and does not munge the actual telegraf metric that is collected and passed on from the plugin
| ## maximum duration before timing out read of the request | ||
| read_timeout = "10s" | ||
| ## maximum duration before timing out write of the response | ||
| write_timeout = "10s" |
There was a problem hiding this comment.
Consider combining these into a single timeout.
There was a problem hiding this comment.
They do surface two different configurations on the http.Server (
telegraf/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go
Lines 139 to 145 in af42bc9
influxdb_listener implementation. I'm not sure how they might be used, but I'd like to keep them separate.
There was a problem hiding this comment.
My thought was that the response is typically very small, so there isn't a lot of value in having separate options vs a simpler combined timeout. If it is difficult to set this up on the http.Server, then it is okay.
There was a problem hiding this comment.
Decided that given these should be small responses, to entirely remove the timeouts for a simpler config. Should someone need to add that functionality in the future, it should be straightforward to forward port from the influxdb_listener or other examples in the code base.
| defer body.Close() | ||
| } | ||
|
|
||
| parser := influx.NewStreamParser(body) |
There was a problem hiding this comment.
One spot where the InfluxDB 2.x API differs from the 1.x API is that writes are either fully accepted or fully rejected. I would switch this over to use the regular influx.Parser.
From the /write docs:
400: line protocol poorly formed and no points were written. Response can be used to determine the first malformed line in the body line-protocol. All data in body was rejected and not written.
We should try to mirror the API from InfluxDBv2 as closely as we reasonable can to avoid issues when switching between the APIs.
There was a problem hiding this comment.
🤔 so maybe I should still enforce some form of max_body_size if I have to read the entire http response body into memory and pass into influx.Parse(bodyBytes) #7828 (comment)
There was a problem hiding this comment.
I definitely agree we want to match, not pushing back on that, just trying to brainstorm on the best way to approach this since I presume the accumulator isn't transactional, and I'd have to scrap the streaming code (or again store the telegraf metrics in memory and throw them away if we encounter an error):
There was a problem hiding this comment.
I do think we should keep max_body_size, commented over there. You are right that the accumulator isn't transactional, so we aren't safe in the case of SIGKILL, power loss or the likes. Anyway, the output plugin is unsyncronized with the inputs, so the data could go out in separate writes.
Still we can do a close approximation that will work under normal operation (outputs up/clean shutdown): after switching to the Parse() function just check and return the error or add all the metrics if no error.
There was a problem hiding this comment.
Added max_body_size back, implemented all or nothing writes.
| } | ||
| } | ||
|
|
||
| func badRequest(res http.ResponseWriter, errString string) { |
There was a problem hiding this comment.
In order to more closely match the InfluxDBv2 API, we should try to sync up some common error messages. We should check example error responses from InfluxDBv2 listed at https://v2.docs.influxdata.com/v2.0/api/#tag/Write and anywhere we produce the equivalent error try to produce something similar.
| func init() { | ||
| inputs.Add("influxdb_v2_listener", func() telegraf.Input { | ||
| return &InfluxDBV2Listener{ | ||
| ServiceAddress: ":8186", |
There was a problem hiding this comment.
Default to the 9999 port to match InfluxDBv2 OSS.
| readysServed selfstat.Stat | ||
| requestsRecv selfstat.Stat | ||
| notFoundsServed selfstat.Stat | ||
| buffersCreated selfstat.Stat |
There was a problem hiding this comment.
I don't see buffersCreated being used, can we remove it?
| When chaining Telegraf instances using this plugin, CREATE DATABASE requests | ||
| receive a 200 OK response with message body `{"results":[]}` but they are not | ||
| relayed. The output configuration of the Telegraf instance which ultimately | ||
| submits data to InfluxDB determines the destination database. |
There was a problem hiding this comment.
Let's remove this paragraph since it isn't relevant for InfluxDBv2.
internal/http.go
Outdated
| // TokenAuthHandler returns a http handler that requires `Authorization: Token <token>` | ||
| // Introduced to support InfluxDB 2.x style authentication | ||
| // https://v2.docs.influxdata.com/v2.0/reference/api/#authentication | ||
| func TokenAuthHandler(token string, onError TokenAuthErrorFunc) func(h http.Handler) http.Handler { |
There was a problem hiding this comment.
Set this up to handle a direct comparison with any Authorization header value, not treating the scheme and credentials separately, just as a single big string. I think this will be more reusable and less tied to the InfluxDBv2 plugin.
internal/http.go
Outdated
| authParts := strings.SplitN(authHeader, " ", 2) | ||
| if len(authParts) != 2 || | ||
| subtle.ConstantTimeCompare( | ||
| []byte(strings.ToLower(strings.TrimSpace(authParts[0]))), | ||
| []byte(strings.ToLower(h.scheme))) != 1 || | ||
| subtle.ConstantTimeCompare([]byte(strings.TrimSpace(authParts[1])), []byte(h.credentials)) != 1 { |
There was a problem hiding this comment.
Let's remove the normalization steps here, just do a subtle.ConstantTimeCompare against the header value.
|
Excellent points @danielnelson, I'll get right on these 👍 Thank you! |
|
Comments from Daniel still remaining:
|
|
I think all of @danielnelson 's concerns have been addressed at this point. A fair bit of changes since last review - each split in their own commit (hopefully for easier review). |
|
Just poking in to see if there is a planned release timeline for 1.16.0. I've been running this as a custom built telegraf in production fairly successfully, not a lot of load on this for proving it out. However, it has kept up with what I'm using it for. |
|
@magichair We're looking to get a 1.16.0 release soon. Ideally to get this plugin in to align with the InfluxDB OSS 2 GA. |
|
Merged. Thanks again for the great effort here! |
|
Can this listener for v2 also support a EDIT: Which should return 200 instead of 204, because of this #4935 |
@eraac I debating putting the /ping endpoint equivalent in (the Is there a reason you can't add https://github.com/influxdata/telegraf/tree/master/plugins/outputs/health to setup a generic health endpoint for your telegraf node overall? This is what I use for my AWS ALB health checks. |
@magichair I just try, but I can't start Our telegraf agent is deployed on Kubernetes, and we can't set a different port for the health check on the load balancer. this feature is only available for 1.17 and is still "beta" |
|
That makes sense, if we wanted to resurrect it in a separate PR, it was roughly stubbed out and then removed in this commit: e1571b9 |
closes #6626
Most code is ported directly from the
influxdb_listenerinput plugin. Naming convention forinfluxdb_v2*matches the paradigm with the existing output plugin.I've manually tested functionality of this plugin with two of the official InfluxDB 2.x client libraries:
Only open question, I ported thehandlePingmethod frominfluxdb_listenerto instead support the/api/v2/readyendpoint. But it could easily be removed if there is no requirement from the client libraries to test readiness before writing.Removed
handleReadyAlso to note, the
handleQuerymethod was removed, the comments indicated that was to support some client libraries that would attempt a basic query to test connectivity. I didn't see any proof of that in the above two client libraries, so I removed the stubbed query response.Oh, another thing updated to support this change, a new Auth handler is added to manage InfluxDB's idiosyncratic
Authorization: Token <token>authentication strategy.Required for all PRs: