-
Notifications
You must be signed in to change notification settings - Fork 61
fix: properly handle asynchronous read from stream #1284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…into stream-tests
|
Note: the system and samples tests won't run here until I rebase this PR to be against the Also note: I cherry-picked a commit from #1285 to avoid an unrelated Windows test failure. |
e7f4249 to
fbd43a0
Compare
src/chunktransformer.ts
Outdated
| this.reset(); | ||
| } | ||
|
|
||
| get canceled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we expose ChunkTransformer as part of our public api. Which I think is incorrect to begin with. To minimize further leakage of implementation details, can we mark this method as internal?
| (typeof chunk.timestampMicros === 'number' && | ||
| chunk.timestampMicros! > 0) || | ||
| // if it's an instance of Long | ||
| (typeof chunk.timestampMicros === 'object' && | ||
| 'compare' in chunk.timestampMicros && | ||
| typeof chunk.timestampMicros.compare === 'function' && | ||
| chunk.timestampMicros.compare(0) === 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit strange, chunk is a protobuf, so shouldnt it have a stable type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the specifics of protobufjs handling 64 bit integers. In JavaScript, all numbers are 64-bit IEEE 754 floats, so they can reliably represent up to 2⁵³ – 1. So protobufjs accepts and may emit an object of type Long (from https://www.npmjs.com/package/long). As far as I understand in this case it's always number in practice, but in my clean workspace VSCode shows error here, so should be safe to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please leave a comment in the code explaining this
src/table.ts
Outdated
| transform: (rowData, _, next) => { | ||
| if ( | ||
| chunkTransformer._destroyed || | ||
| chunkTransformer.canceled || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like there are only 2 things that care about the cancelled flag:
- the closure for userstream.end()
- the closure for toRowStream
ChunkTransformer seems like an innocent bystander here. Why not move the userCancelled flag as a local var in createReadStream(), whose scope is shared by the 2 closure that care about the flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, maybe create a subclass for userStream that holds the state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the flag closer to the consumer makes sense to me, I'll try to do it. I only put it here to replace the _destroyed logic, but you are right, it will make more sense downstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
| if (this._destroyed) { | ||
| return; | ||
| } | ||
| } | ||
| if (data.lastScannedRowKey && data.lastScannedRowKey.length > 0) { | ||
| this.lastRowKey = Mutation.convertFromBytes( | ||
| data.lastScannedRowKey as Bytes, | ||
| { | ||
| userOptions: this.options, | ||
| } | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this breaks request resumption logic in makeNewRequest
const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastRowKey is set when committing, it should not be set here because the row is still incomplete at this moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my fault, I forgot to mention one other detail of ReadRows protocol, there are 2 ways that a readrows resumption request can be built:
- based on the last received row key of a committed row chunk. ( This one you know about)
- As a heartbeat sent via an empty ReadRowsResponse. This is will be used when a caller has very restrictive filters that cause the scan to omit entire tablets. The protocol allows for the server to emit a heartbeat with the last scanned (as opposed to returned) row key
This is not currently enable on the serverside, but is specified by the protocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this
|
Also I think we still have a problem of end() on the userStream being delayed. I think we can deal with it in a separate PR but I think that in the case where the consumer is slower than the grpc producer (ie. the user transformer calls the callback in a setTimeout()). Then rows can get buffered in the userStream. When the end user calls cancels the stream early via an end(), the rows buffered in the userStream will still be emitted. I'm not sure how to deal with this. We would need the userStream to signal the stream end on read() when cancelled() |
| (typeof chunk.timestampMicros === 'number' && | ||
| chunk.timestampMicros! > 0) || | ||
| // if it's an instance of Long | ||
| (typeof chunk.timestampMicros === 'object' && | ||
| 'compare' in chunk.timestampMicros && | ||
| typeof chunk.timestampMicros.compare === 'function' && | ||
| chunk.timestampMicros.compare(0) === 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please leave a comment in the code explaining this
| }); | ||
| const originalEnd = userStream.end.bind(userStream); | ||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment explaining that this is needed to fulfill a pre-existing contract with the caller, where the caller can prematurely cancel the stream by calling end. And we need to be able to disambiguate between a user cancellation and the a normal end. So we capture the originalEnd to be used for normal termination and the overwritten end is meant for end users
src/table.ts
Outdated
| rowStream?.unpipe(userStream); | ||
| rowStream.removeListener('end', originalEnd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add 2 helper functions rowStreamPipe & rowStreamUnpipe and add comments that we need to do special handling of the end event to re-direct the handler to the originalEnd
src/table.ts
Outdated
| rowStream.unpipe(userStream); | ||
| rowStream.removeListener('end', originalEnd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please delegate to a helper
danieljbruce
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. A couple of questions/comments about specific details.
| callback(); | ||
| return; | ||
| } | ||
| callback(null, row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we add this logic here to prevent the user from receiving rows when they make it past the race condition in toRowStream? If so then do we need the userCanceled || check in toRowStream at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The reason we also have the condition in toRowStream.transform is just because it does not make much sense to pipe more data through toRowStream after the user explicitly asked us to stop.
|
|
||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { | ||
| rowStreamUnpipe(rowStream, userStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this prevent the data from being passed on from rowStream to userStream? It looks like we are using both userCanceled and unpipe to stop passing data along. Should we also unpipe the data being passed from the chunk transformer to toRowStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really care about the data passed from chunkTransformer to toRowStream because this is all inside rowStream. We only need to stop that data from flowing into the user stream. Since it's an end of the user stream, it's not really important if more or less data has flown inside the rowStream pipeline.
Fixes #607.
Note: I'm currently comparing this pull request to #1282, I will change the base tomainafter #1282 is merged. This way it's easier to review.Several changes here:
Remove the logic in
table.tsthat would skip the unprocessed records if aChunkTransformerhas_destroyedflag set. There are two separate cases when theChunkTransformergets destroyed:if it's a network problem or anything else that forced gRPC to emit an
errorevent. In such case, it always makes sense to process all the rows we have received before giving up.if it's a user cancellation (by calling
stream.end()from the user code). In this case we need to stop emitting new rows.So let's introduce a new flag,Let's have the new flag,_canceled, and ignore the incoming rows only if the stream is canceled from.end().userCanceled, and use it inuserStreamtransformmethod to stop emitting new rows when the user don't need them, but we'll still keep emitting all rows if there was no user cancellation. This change fixes #607.When a network error happens, we need to update theThis was actually a test problem, there is no problem here.lastRowKeyonly if the row processing has completed (the commit bit was set in the last chunk). Until then, don't updatelastRowKey. The current code makes recovery skip one record - the one that was incomplete.Minor change to make modern TypeScript compiler happy when comparing
chunk.timestampMicrosto zero.chunk.timestampMicroscan theoretically be an instance ofLong, so let's handle this accordingly. It was showing as an error in my editor and it makes sense so I'm fixing it in the safest way possible. In the default case, the||expression will short circuit in thetypeof chunk.timestampMicros === 'number'branch.