agent.flush() waits for inflight spans and errors#2372
Conversation
- missing some tests - doesn't yet handle inflight *errors* Will eventually fix #2294
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
🤖 GitHub commentsTo re-run your PR in the CI, just comment with:
|
…) sequencing; they both use the same write buffer/queue in the http client
… get transaction.id association; do NOT pass callback to captureError because it results in double-flush (first in agent.captureError, second from lambda wrapper handling). With inflight event handling in agent.flush we no longer need to wait for the error to process before calling agent.flush() when the lambda completes.
…transport.sendError callback -- it led to the Agent's default handleUncaughtException behaviour of process.exit(1)
…ed before agent.flush() handled this
astorm
left a comment
There was a problem hiding this comment.
Overall looks like a reasonable best effort solution -- if I'm reading the code correctly it looks like the flush will be passed on to the client as soon as the last item is deleted from the inflight set OR the timeout (currently one second) for the inflight set is reached.
Also, I gave this a spin with the last known working lambda extension and data flowed into APM Server correctly. I've got a few questions below for thoroughness, but this looks good as is. Approving.
| cb(new Error('cannot capture error before agent is started'), id) | ||
| agent._transport.sendError(apmError) | ||
| inflightEvents.delete(id) | ||
| if (!handled || cb) { |
There was a problem hiding this comment.
I don't quite follow the reason for this change here -- it looks like this code has been removed from the sendError callback function, which (may?) chance the timing here subtly. Can you talk a bit about why this change was necessary?
There was a problem hiding this comment.
Good question.
First, I removed the use of the sendError callback, because both the sent apmError and the flush token added to the http client Client by agent.flush() use the same queue: the Client stream (it inherites from stream.Writable). So there is no need to sequence apm.flush() asynchronously after sendError.
Second, waiting for the callback from sendError can actually delay the start of the intake request:
Client.prototype.sendError = function (error, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return
}
this._maybeCork()
return this.write({ error }, Client.encoding.ERROR, cb)
}That _maybeCork will delay the start of an intake request, if there isn't one already by bufferWindowTime, which defaults to 20ms. I restored the "use the sendError callback" and added some logging and re-ran this test in test/agent.test.js:
test.only('#captureError()', function (t) {
...
// Passing a callback to `captureError` means agent.flush() will be called.
t.test('with callback', function (t) {
The logging shows a ~20ms delay from the "Sending error to Elastic APM" message to "intake request start":
[2021-10-20T22:11:14.313Z] INFO (elastic-apm-node): Sending error to Elastic APM: {"id":"52b6dfb48f99b95c923b3c186ca1df3e"}
XXX corking
XXX sync after sendError
XXX uncorking1 (after bufferWindowTime)
[2021-10-20T22:11:14.338Z] TRACE (elastic-apm-node): intake request start
event.module: "apmclient"
reqId: "69557af9d5c862a96a45eb4119397f5d"
[2021-10-20T22:11:14.340Z] TRACE (elastic-apm-node): _write: encode object
event.module: "apmclient"
fullTimeMs: 5.267449
numEvents: 1
numBytes: 6831
[2021-10-20T22:11:14.341Z] TRACE (elastic-apm-node): intakeReq "socket": unref it
event.module: "apmclient"
reqId: "69557af9d5c862a96a45eb4119397f5d"
XXX in callback from sendError
[2021-10-20T22:11:14.341Z] TRACE (elastic-apm-node): _writeFlush
event.module: "apmclient"
active: true
The apm.flush() call comes after that in once sendError calls back.
With the currently proposed code the intake request is starts right away (a few ms of processing time):
[2021-10-20T22:04:42.229Z] INFO (elastic-apm-node): Sending error to Elastic APM: {"id":"96ef84fc56d978ab34e3201b392c1de0"}
[2021-10-20T22:04:42.234Z] TRACE (elastic-apm-node): intake request start
event.module: "apmclient"
reqId: "33fd40c31c5ad99af7dd9de8ab8b0e40"
[2021-10-20T22:04:42.235Z] TRACE (elastic-apm-node): _writeBatch
event.module: "apmclient"
encodeTimeMs: 0.690205
fullTimeMs: 5.48049
numEvents: 1
numBytes: 6871
because the apm.flush() called right after the sendError:
[2021-10-20T22:23:39.171Z] INFO (elastic-apm-node): Sending error to Elastic APM: {"id":"affa9a3c0dc1c8958515255657c3ea95"}
XXX corking
XXX sync after sendError
XXX uncorking2
[2021-10-20T22:23:39.177Z] TRACE (elastic-apm-node): intake request start
event.module: "apmclient"
reqId: "6e2b4c7c250e96a3f5f9ea6e056a4e9b"
[2021-10-20T22:23:39.178Z] TRACE (elastic-apm-node): _writeBatch
Because the Client.flush uncorks immediately:
Client.prototype.flush = function (cb) {
this._maybeUncork()Last, I'll admit to not being 100% sure there isn't some interaction difference if there an ongoing intake request and enough data on the Client buffer such that the just sent apmError will need to wait for one or more separate intake requests to get fully flushed. I haven't followed what happens with the this._write(error, cb) callback when it is passed to the stream chopper.
| // inflightEvents.delete(id) | ||
| Agent.prototype.flush = function (cb) { | ||
| if (this._transport) { | ||
| // TODO: Only bind the callback if the transport can't use AsyncResource from async hooks |
There was a problem hiding this comment.
It looks like we lost this comment in the new version. Is this something that's still relevant?
There was a problem hiding this comment.
This was intentional. That TODO was added a long while back and there is no details that I found that explains why AsyncResource would be better and how it would be used. I think _instrumentation.bindFunction(cb) is the right thing to do to ensure that cb executes in the correct run context. If at some point our Run Context Manager uses AsyncResource to handle tracking, then great, but code like this should just use the Instrumentation class API.
fix: improve Agent#flush() to wait for inflight ended spans and errors
With this change an
apm.flush()will wait for "inflight" spans and errors.These are spans that have ended, but have not yet been encoded and sent
to the transport; and errors that are still capturing. Both of these are
necessarily asynchronous, mainly for stacktrace collection. This allows one
to synchronously call flush like this and have it work as expected:
This is helpful for flushing APM data for a finished Lambda function. This
change also includes some related (and unrelated) Lambda instrumentation
improvements:
Fixes: #2294
Checklist