fix: Data race on transformers pipeline closed check#19130
Closed
Conversation
Member
Author
|
Probably doesn't really matter because the whole isClosed check is not doing what we want it to do reliably. |
Member
|
How can we reproduce the data race? Do we need to add a test for it? |
Member
Author
|
@erezrokah Build the cli with kind: source
spec:
name: test
registry: cloudquery
path: cloudquery/test
version: v4.3.6
tables: [ test_paid_table ]
destinations: ["test"]
spec:
num_rows: 50000
num_clients: 40
---
kind: destination
spec:
name: test
path: cloudquery/test
registry: cloudquery
version: v2.4.0
spec:
error_on_insert: true
error_on_write: false
error_on_migrate: false
batch_writer: false |
Contributor
marianogappa
added a commit
that referenced
this pull request
Sep 16, 2024
fixes cloudquery/cloudquery-issues#2436 Several different users have reported that, since transformers have been implemented, certain syncs are hanging forever. The transformations implementation seems to work well in these cases: - A sync completes with no errors - A sync finishes with errors due to the source - A sync finishes with errors due to the transformation However, when a destination has an error, at least sometimes it hangs forever. Each transformer is an implementation of a `grpc.BidiStreamingClient`, which uses these two methods: ```go Send(*Req) error Recv() (*Res, error) ``` Which in turn call these: ```go // SendMsg is generally called by generated code. On error, SendMsg aborts // the stream. If the error was generated by the client, the status is // returned directly; otherwise, io.EOF is returned and the status of // the stream may be discovered using RecvMsg. // // SendMsg blocks until: // - There is sufficient flow control to schedule m with the transport, or // - The stream is done, or // - The stream breaks. // // SendMsg does not wait until the message is received by the server. An // untimely stream closure may result in lost messages. To ensure delivery, // users should ensure the RPC completed successfully using RecvMsg. // // It is safe to have a goroutine calling SendMsg and another goroutine // calling RecvMsg on the same stream at the same time, but it is not safe // to call SendMsg on the same stream in different goroutines. It is also // not safe to call CloseSend concurrently with SendMsg. // // It is not safe to modify the message after calling SendMsg. Tracing // libraries and stats handlers may use the message lazily. SendMsg(m any) error // RecvMsg blocks until it receives a message into m or the stream is // done. It returns io.EOF when the stream completes successfully. On // any other error, the stream is aborted and the error contains the RPC // status. // // It is safe to have a goroutine calling SendMsg and another goroutine // calling RecvMsg on the same stream at the same time, but it is not // safe to call RecvMsg on the same stream in different goroutines. RecvMsg(m any) error ``` Both `SendMsg` and `RecvMsg` can hang forever, and this is what users are reporting. However we may want to mitigate this issue (let's say the flow of data or the orchestration for closing is wrong), it's probably gonna be fragile if we still rely on those methods not hanging. Thus, what I've done is running those two methods (`Recv()` and `Send()`) asynchronously. The Pipeline thread will invoke these, but it will exit if someone closes the pipeline (by using a `select`). I've tested running @disq 's reproduction test 100 times; it no longer hangs nor panics: #19130 (comment) **It correctly fails on a test error on insert configuration**  **It correctly succeeds on a success test configuration**  **It still doesn't swallow the last batch, as tested by running a stable sync multiple times and seeing the same resource count** 
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes the data race on
isClosed, which we introduced in #19105