fix: Fix transformers-related hang.#19165
Conversation
|
|
||
| // Send can block forever (e.g. if grpc buffer is full), so we run it asynchronously | ||
| // and check if pipeline is closed every second. | ||
| go func() { |
There was a problem hiding this comment.
| go func() { | |
| go func() { | |
| defer close(sendCh) |
This way sendCh is almost always closed.
There was a problem hiding this comment.
What is the benefit of explicitly closing this channel? Btw, this goroutine is never explicitly exited either; the defer might as well be outside of it.
There was a problem hiding this comment.
Is the client.Send() method blocking ad infinitum (provided no error was encountered?) I thought (well, the comment above says...) it would only block if grpc buffer is full, or such cases.
There was a problem hiding this comment.
Closing would be to stop having unused open channels (do they use file descriptor limits? probably not, but don't know)
There was a problem hiding this comment.
The case that we're hitting is that:
- the source (via
nworkers) is sending non-stop - the destination was fine but then errors
- the
senddoesn't getrecv'd many times in a row until the buffer is full
But with my fix, within <= 1 second, the pipeline is closed.
This should be a single channel that is needlessly around for about 1 sec, so I think we're fine.
| return err | ||
|
|
||
| recvCh := make(chan *plugin.Transform_Request) | ||
| errCh := make(chan error) |
There was a problem hiding this comment.
Again, ideally do a defer close(errCh) in the goroutine (but handle the closed case in the select below by using the val, ok := <-errCh syntax)
There was a problem hiding this comment.
Same question regarding this close. What's the benefit of closing it?
🤖 I have created a release *beep* *boop* --- ## [6.7.0](cli-v6.6.3...cli-v6.7.0) (2024-09-16) ### Features * Add `install` support for Transformer plugins ([#19166](#19166)) ([cc6ca4b](cc6ca4b)) ### Bug Fixes * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.62.0 ([#19137](#19137)) ([ed315d0](ed315d0)) * **deps:** Update module google.golang.org/grpc to v1.66.2 ([#19160](#19160)) ([8699db3](8699db3)) * Fix transformers-related hang. ([#19165](#19165)) ([e151df2](e151df2)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
fixes https://github.com/cloudquery/cloudquery-issues/issues/2436
Several different users have reported that, since transformers have been implemented, certain syncs are hanging forever.
The transformations implementation seems to work well in these cases:
However, when a destination has an error, at least sometimes it hangs forever.
Each transformer is an implementation of a
grpc.BidiStreamingClient, which uses these two methods:Which in turn call these:
Both
SendMsgandRecvMsgcan hang forever, and this is what users are reporting.However we may want to mitigate this issue (let's say the flow of data or the orchestration for closing is wrong), it's probably gonna be fragile if we still rely on those methods not hanging.
Thus, what I've done is running those two methods (
Recv()andSend()) asynchronously. The Pipeline thread will invoke these, but it will exit if someone closes the pipeline (by using aselect).I've tested running @disq 's reproduction test 100 times; it no longer hangs nor panics:
#19130 (comment)
It correctly fails on a test error on insert configuration

It correctly succeeds on a success test configuration

It still doesn't swallow the last batch, as tested by running a stable sync multiple times and seeing the same resource count
