feat: Initial timed stream implementation for application latencies#1639
feat: Initial timed stream implementation for application latencies#1639danieljbruce merged 40 commits intomainfrom
Conversation
…gleapis/nodejs-bigtable into 359913994-timed-stream-tests
No region tags are edited in this PR.This comment is generated by snippet-bot.
|
…gleapis/nodejs-bigtable into 359913994-timed-stream-tests # Conflicts: # src/timed-stream-new.ts
Add a note for the setTimeout test
…gleapis/nodejs-bigtable into 359913994-timed-stream-tests
src/timed-stream.ts
Outdated
| this.emit('before_row'); | ||
| // Defer the after call to the next tick of the event loop | ||
| process.nextTick(() => { | ||
| this.emit('after_row'); |
There was a problem hiding this comment.
Do you think exposing these as events is useful?
I did it this way for the prorotype, thinking we might have to hook into this later. but if we don't actually need to subscribe to these for other reasons, we can just make these standard method calls and keep it internal. What do you think?
There was a problem hiding this comment.
The tests are fine without them so I removed these events. They don't seem to be useful in practice.
src/timed-stream.ts
Outdated
| const endTime = process.hrtime.bigint(); | ||
| const duration = endTime - startTimeTransform; | ||
| this.totalDurationTransform += duration; | ||
| startTimeTransform = process.hrtime.bigint(); |
There was a problem hiding this comment.
It seems a little weird to me that this method essentially re-implements the handleBeforeRowRead and handleAfterRowRead logic.
Ideally we could have shared helpers. But if that's not going to work because we're separating the state, we should probably at least remove the two handle* functions to make read match transform
There was a problem hiding this comment.
I made this a lot cleaner by introducing a StreamTimer class. We shouldn't have this duplicate code.
src/timed-stream.ts
Outdated
| highWaterMark: 0, | ||
| transform: (event, _encoding, callback) => { | ||
| // First run code for time measurement before the transform callback is | ||
| // invoked. ie. Ensure that the timer is started. |
There was a problem hiding this comment.
You should probably add something that mentions why we record in both transform and read (read is for iterating, transform is for handlers)
There was a problem hiding this comment.
Okay. I added a comment here for that.
daniel-sanche
left a comment
There was a problem hiding this comment.
Overall LGTM, but take a look at those comments
| ) => void; | ||
| }; | ||
|
|
||
| class StreamTimer { |
There was a problem hiding this comment.
a docstring would be useful here
There was a problem hiding this comment.
I'll address this in a backlog bug because this is merge ready.
| private readTimer = new StreamTimer(); | ||
| private transformTimer = new StreamTimer(); | ||
| constructor(options?: TimedStreamOptions) { | ||
| // highWaterMark of 1 is needed to respond to each row |
There was a problem hiding this comment.
is this still relevant? The code itself is using highWaterMark of 0
There was a problem hiding this comment.
I'll address this in a backlog bug because this is merge ready.
Description
Adds a timed stream object. This PR adds an implementation for the timed stream object which provides an additional mechanism on top of streams that measures the amount of time the user processes data from the stream. The timed stream can then be used as a tool for measuring application latencies because application latencies measure time processing data. Tests should accurately reflect what the timed stream should and should not do.
Impact
Basically provides the right plumbing for application latencies measurement. This should allow us to make code changes to each method that make use of the TimedStream and measure application latencies.
Testing
Unit tests for the timed stream are added to illustrate what the timed stream should do. This ensures the measurements captured by the timed stream will meet the needs required in order to measure application latencies. This mostly includes ensuring the measurement is correct under a variety of circumstances like if the server delays sending data, there is backpressure or if the loop/handlers contains synchronous or asynchronous calls.