fix: Error handling in StreamingBatchWriter#1913
Conversation
| t.Fatalf("expected 0 open tables, got %d", l) | ||
| } | ||
|
|
||
| if l := testClient.MessageLen(messageTypeInsert); l != 3 { |
There was a problem hiding this comment.
Added two inserts above, but increased this from 3 to 4... This was incorrect to expect 3 because the flushing flow has been different for some time, and you don't need the third message to make it flush.
| case err := <-outputCh: | ||
| if err != nil { | ||
| s.errCh <- fmt.Errorf("handler failed on %s: %w", tableName, err) | ||
| return |
There was a problem hiding this comment.
This return statement is my actual contribution apart from renaming variables and updating tests, oh and better handling of error type panics.
| for { | ||
| select { | ||
| case msg := <-msgs: | ||
| if msg == nil { |
There was a problem hiding this comment.
Wouldn't you want to msg, ok := instead here?
| w.lastMsgType = msgType | ||
| if err := w.startWorker(ctx, errCh, msg); err != nil { | ||
|
|
||
| case err := <-errCh: |
There was a problem hiding this comment.
The previous logic wasn't shutting down on error; is this correct?
There was a problem hiding this comment.
Previous logic would do a w.logger.Err(err).Msg("error from StreamingBatchWriter") and continue working.
|
@erezrokah adding you as well; this is a little tricky with not that much experience with the writer |
|
Merge at will, I'm not merging right now to prevent any potential noise. (and the unreleased SDK already has some interesting changes) |
🤖 I have created a release *beep* *boop* --- ## [4.64.1](v4.64.0...v4.64.1) (2024-10-02) ### Bug Fixes * Error handling in StreamingBatchWriter ([#1913](#1913)) ([d852119](d852119)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
|
Looks like this broke something cloudquery/cloudquery#19312 |
Reverts #1913 This broke come stuff, so reverting it to unblock SDK changes cloudquery/cloudquery#19312 (comment)
original PR: #1913 Reverts the revertion, plus more improvements: - Client's handlers are now allowed to return an immediate error without draining the channel. The channel will be automatically drained for the error condition, and subsequent writes for that table won't get sent to the client. - Some potential race conditions are fixed (`ensureOpened()` inline-func now got refactored into `s.send()` and handles the sending as well. The spawned goroutine doesn't refer to `inputCh` directly so that it can be replaced in `closeFlush()`. - Shutdown logic is handled better: even if `client.Write()` returns an error after `msgs` is closed, it's still logged and returned.
Actual clean up by @murarustefaan but I've done some minor updates to it and updated tests.
Still need to test it E2E.Seems to work, handled thepanic: arrow/array: number of columns/fields mismatch [recovered]error immediately (I tested with an old version of the S3 plugin, had to manually bump Arrow to v17)