Commit e151df2
authored
fix: Fix transformers-related hang. (#19165)
fixes cloudquery/cloudquery-issues#2436
Several different users have reported that, since transformers have been
implemented, certain syncs are hanging forever.
The transformations implementation seems to work well in these cases:
- A sync completes with no errors
- A sync finishes with errors due to the source
- A sync finishes with errors due to the transformation
However, when a destination has an error, at least sometimes it hangs
forever.
Each transformer is an implementation of a `grpc.BidiStreamingClient`,
which uses these two methods:
```go
Send(*Req) error
Recv() (*Res, error)
```
Which in turn call these:
```go
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
//
// It is not safe to modify the message after calling SendMsg. Tracing
// libraries and stats handlers may use the message lazily.
SendMsg(m any) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m any) error
```
Both `SendMsg` and `RecvMsg` can hang forever, and this is what users
are reporting.
However we may want to mitigate this issue (let's say the flow of data
or the orchestration for closing is wrong), it's probably gonna be
fragile if we still rely on those methods not hanging.
Thus, what I've done is running those two methods (`Recv()` and
`Send()`) asynchronously. The Pipeline thread will invoke these, but it
will exit if someone closes the pipeline (by using a `select`).
I've tested running @disq 's reproduction test 100 times; it no longer
hangs nor panics:
#19130 (comment)
**It correctly fails on a test error on insert configuration**

**It correctly succeeds on a success test configuration**

**It still doesn't swallow the last batch, as tested by running a stable
sync multiple times and seeing the same resource count**
1 parent cedf287 commit e151df2
1 file changed
+65
-26
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
4 | 4 | | |
5 | 5 | | |
6 | 6 | | |
| 7 | + | |
| 8 | + | |
7 | 9 | | |
8 | 10 | | |
9 | 11 | | |
| |||
69 | 71 | | |
70 | 72 | | |
71 | 73 | | |
72 | | - | |
73 | | - | |
| 74 | + | |
| 75 | + | |
74 | 76 | | |
75 | 77 | | |
76 | | - | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
77 | 97 | | |
78 | 98 | | |
79 | 99 | | |
80 | 100 | | |
81 | 101 | | |
82 | 102 | | |
83 | 103 | | |
84 | | - | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
85 | 111 | | |
86 | 112 | | |
87 | 113 | | |
88 | 114 | | |
89 | 115 | | |
90 | | - | |
91 | | - | |
92 | | - | |
93 | | - | |
94 | | - | |
95 | | - | |
96 | | - | |
97 | | - | |
98 | | - | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
99 | 119 | | |
100 | 120 | | |
101 | 121 | | |
102 | 122 | | |
103 | 123 | | |
104 | 124 | | |
105 | 125 | | |
106 | | - | |
| 126 | + | |
107 | 127 | | |
108 | 128 | | |
109 | | - | |
| 129 | + | |
110 | 130 | | |
111 | 131 | | |
112 | 132 | | |
113 | | - | |
114 | | - | |
115 | | - | |
116 | | - | |
117 | | - | |
118 | | - | |
119 | | - | |
120 | | - | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
121 | 147 | | |
122 | | - | |
123 | | - | |
124 | | - | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
125 | 164 | | |
126 | 165 | | |
127 | 166 | | |
| |||
0 commit comments