Skip to content

Influxdb v2 listener#7828

Merged
ssoroka merged 13 commits intoinfluxdata:masterfrom
magichair:influxdb_v2_listener
Sep 14, 2020
Merged

Influxdb v2 listener#7828
ssoroka merged 13 commits intoinfluxdata:masterfrom
magichair:influxdb_v2_listener

Conversation

@magichair
Copy link
Copy Markdown
Contributor

@magichair magichair commented Jul 13, 2020

closes #6626

Most code is ported directly from the influxdb_listener input plugin. Naming convention for influxdb_v2* matches the paradigm with the existing output plugin.

I've manually tested functionality of this plugin with two of the official InfluxDB 2.x client libraries:

Only open question, I ported the handlePing method from influxdb_listener to instead support the /api/v2/ready endpoint. But it could easily be removed if there is no requirement from the client libraries to test readiness before writing.
Removed handleReady

Also to note, the handleQuery method was removed, the comments indicated that was to support some client libraries that would attempt a basic query to test connectivity. I didn't see any proof of that in the above two client libraries, so I removed the stubbed query response.

Oh, another thing updated to support this change, a new Auth handler is added to manage InfluxDB's idiosyncratic Authorization: Token <token> authentication strategy.

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.
  • Awaiting Legal review from my company regarding the applicability of the CCLA Update: Do not need any additional approvals

@magichair
Copy link
Copy Markdown
Contributor Author

I'll have a fix for the time related test failure.

Copy link
Copy Markdown
Contributor

@ssoroka ssoroka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks amazing! Very thorough.
Please remove the .DS_Store files for merging.
I have a couple small changes in feedback.

}

func (h *InfluxDBV2Listener) Description() string {
return "Accept metrics over InfluxDB 1.x HTTP API"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update description.

ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove this. if it's already deprecated, there's no reason to add it to a new plugin.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, missed this, will remove.


ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we really need a max body size. If we do, 32mb is possibly too small. We should be streaming the request through, so this won't affect memory used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah - good catch, InfluxDB 2.x now no longer has a max-body-size config parameter - https://v2.docs.influxdata.com/v2.0/reference/config-options/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this option, 32mb seems like plenty. The typical batch size is 1000-5000 metrics and is gzip compressed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit seems pretty arbitrary if we're taking results of any size in a stream.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API from InfluxDBv2 is built around accepting batches of data, it's not a streaming API. As everything is loaded into memory, I think this is is a useful limit to avoid OOM.

Copy link
Copy Markdown
Contributor

@ssoroka ssoroka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so is it magi chair, magic chair, or magic hair? :D Either way, bravo, fellow mage. 🧙

@magichair
Copy link
Copy Markdown
Contributor Author

ok so is it magi chair, magic chair, or magic hair? :D Either way, bravo, fellow mage. 🧙

magic hair, an ol' high school nickname that became my de facto online alias. Thanks for the review and approval!

Copy link
Copy Markdown
Contributor

@danielnelson danielnelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a very strong opinion on the /ready endpoint, but if none of the client libraries are using it then I suggest we remove it here just to keep things neat.

Don't forget to remove .DS_Store.

}

func (h *InfluxDBV2Listener) Description() string {
return "Accept metrics over InfluxDB 1.8+ / 2.x HTTP API"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document, and support, 2.x only.

Comment on lines +107 to +109
tags := map[string]string{
"address": h.ServiceAddress,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this optional, or just remove it, so that it is possible to pass though data unmodified. If we want to make it optional it should be done similar to the bucket_tag option.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tags is just used to add tagging to the internal selfstat below and does not munge the actual telegraf metric that is collected and passed on from the plugin

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +58 to +61
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider combining these into a single timeout.

Copy link
Copy Markdown
Contributor Author

@magichair magichair Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do surface two different configurations on the http.Server (

h.server = http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
TLSConfig: tlsConf,
}
) and are carried over from the influxdb_listener implementation. I'm not sure how they might be used, but I'd like to keep them separate.

Copy link
Copy Markdown
Contributor

@danielnelson danielnelson Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that the response is typically very small, so there isn't a lot of value in having separate options vs a simpler combined timeout. If it is difficult to set this up on the http.Server, then it is okay.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided that given these should be small responses, to entirely remove the timeouts for a simpler config. Should someone need to add that functionality in the future, it should be straightforward to forward port from the influxdb_listener or other examples in the code base.

defer body.Close()
}

parser := influx.NewStreamParser(body)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One spot where the InfluxDB 2.x API differs from the 1.x API is that writes are either fully accepted or fully rejected. I would switch this over to use the regular influx.Parser.

From the /write docs:

400: line protocol poorly formed and no points were written. Response can be used to determine the first malformed line in the body line-protocol. All data in body was rejected and not written.

We should try to mirror the API from InfluxDBv2 as closely as we reasonable can to avoid issues when switching between the APIs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 so maybe I should still enforce some form of max_body_size if I have to read the entire http response body into memory and pass into influx.Parse(bodyBytes) #7828 (comment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree we want to match, not pushing back on that, just trying to brainstorm on the best way to approach this since I presume the accumulator isn't transactional, and I'd have to scrap the streaming code (or again store the telegraf metrics in memory and throw them away if we encounter an error):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we should keep max_body_size, commented over there. You are right that the accumulator isn't transactional, so we aren't safe in the case of SIGKILL, power loss or the likes. Anyway, the output plugin is unsyncronized with the inputs, so the data could go out in separate writes.

Still we can do a close approximation that will work under normal operation (outputs up/clean shutdown): after switching to the Parse() function just check and return the error or add all the metrics if no error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added max_body_size back, implemented all or nothing writes.

}
}

func badRequest(res http.ResponseWriter, errString string) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to more closely match the InfluxDBv2 API, we should try to sync up some common error messages. We should check example error responses from InfluxDBv2 listed at https://v2.docs.influxdata.com/v2.0/api/#tag/Write and anywhere we produce the equivalent error try to produce something similar.

func init() {
inputs.Add("influxdb_v2_listener", func() telegraf.Input {
return &InfluxDBV2Listener{
ServiceAddress: ":8186",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default to the 9999 port to match InfluxDBv2 OSS.

readysServed selfstat.Stat
requestsRecv selfstat.Stat
notFoundsServed selfstat.Stat
buffersCreated selfstat.Stat
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see buffersCreated being used, can we remove it?

Comment on lines +12 to +15
When chaining Telegraf instances using this plugin, CREATE DATABASE requests
receive a 200 OK response with message body `{"results":[]}` but they are not
relayed. The output configuration of the Telegraf instance which ultimately
submits data to InfluxDB determines the destination database.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this paragraph since it isn't relevant for InfluxDBv2.

internal/http.go Outdated
Comment on lines +54 to +57
// TokenAuthHandler returns a http handler that requires `Authorization: Token <token>`
// Introduced to support InfluxDB 2.x style authentication
// https://v2.docs.influxdata.com/v2.0/reference/api/#authentication
func TokenAuthHandler(token string, onError TokenAuthErrorFunc) func(h http.Handler) http.Handler {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set this up to handle a direct comparison with any Authorization header value, not treating the scheme and credentials separately, just as a single big string. I think this will be more reusable and less tied to the InfluxDBv2 plugin.

internal/http.go Outdated
Comment on lines +80 to +85
authParts := strings.SplitN(authHeader, " ", 2)
if len(authParts) != 2 ||
subtle.ConstantTimeCompare(
[]byte(strings.ToLower(strings.TrimSpace(authParts[0]))),
[]byte(strings.ToLower(h.scheme))) != 1 ||
subtle.ConstantTimeCompare([]byte(strings.TrimSpace(authParts[1])), []byte(h.credentials)) != 1 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the normalization steps here, just do a subtle.ConstantTimeCompare against the header value.

@magichair
Copy link
Copy Markdown
Contributor Author

Excellent points @danielnelson, I'll get right on these 👍 Thank you!

@magichair
Copy link
Copy Markdown
Contributor Author

Comments from Daniel still remaining:

  • Handle all or nothing writes
  • Normalized error handling to match API spec

@magichair
Copy link
Copy Markdown
Contributor Author

I think all of @danielnelson 's concerns have been addressed at this point. A fair bit of changes since last review - each split in their own commit (hopefully for easier review).

@magichair magichair requested a review from danielnelson July 16, 2020 16:58
@magichair
Copy link
Copy Markdown
Contributor Author

Just poking in to see if there is a planned release timeline for 1.16.0. I've been running this as a custom built telegraf in production fairly successfully, not a lot of load on this for proving it out. However, it has kept up with what I'm using it for.

@sjwang90
Copy link
Copy Markdown
Contributor

@magichair We're looking to get a 1.16.0 release soon. Ideally to get this plugin in to align with the InfluxDB OSS 2 GA.

@ssoroka ssoroka merged commit d764f86 into influxdata:master Sep 14, 2020
@ssoroka
Copy link
Copy Markdown
Contributor

ssoroka commented Sep 14, 2020

Merged. Thanks again for the great effort here!

@eraac
Copy link
Copy Markdown
Contributor

eraac commented Sep 15, 2020

Can this listener for v2 also support a /ping route? When you deploy this to kubernetes or/and behind a load balancer a health check route (without auth) is required to make this work

EDIT: Which should return 200 instead of 204, because of this #4935

@magichair
Copy link
Copy Markdown
Contributor Author

magichair commented Sep 15, 2020

Can this listener for v2 also support a /ping route? When you deploy this to kubernetes or/and behind a load balancer a health check route (without auth) is required to make this work

EDIT: Which should return 200 instead of 204, because of this #4935

@eraac I debating putting the /ping endpoint equivalent in (the /api/v2/ready endpoint), however it didn't seem necessary. The original Influx 1.x listener had it in place for compatibility with the existing InfluxDB client libraries where they tested /ping before allowing a write. In this case, the 2.x client libraries have no such check.

Is there a reason you can't add https://github.com/influxdata/telegraf/tree/master/plugins/outputs/health to setup a generic health endpoint for your telegraf node overall? This is what I use for my AWS ALB health checks.

@eraac
Copy link
Copy Markdown
Contributor

eraac commented Sep 15, 2020

Is there a reason you can't add https://github.com/influxdata/telegraf/tree/master/plugins/outputs/health to setup a generic health endpoint for your telegraf node overall? This is what I use for my AWS ALB health checks.

@magichair I just try, but I can't start output.health and input.influxdb_v2_listener on the same port (which is perfectly logic) :/

Our telegraf agent is deployed on Kubernetes, and we can't set a different port for the health check on the load balancer. this feature is only available for 1.17 and is still "beta"

@magichair
Copy link
Copy Markdown
Contributor Author

That makes sense, if we wanted to resurrect it in a separate PR, it was roughly stubbed out and then removed in this commit: e1571b9

idohalevi pushed a commit to idohalevi/telegraf that referenced this pull request Sep 29, 2020
arstercz pushed a commit to arstercz/telegraf that referenced this pull request Mar 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

InfluxDB v2.0 support for inputs.influxdb_listener plugin

5 participants