Skip to content

Commit e151df2

Browse files
authored
fix: Fix transformers-related hang. (#19165)
fixes cloudquery/cloudquery-issues#2436 Several different users have reported that, since transformers have been implemented, certain syncs are hanging forever. The transformations implementation seems to work well in these cases: - A sync completes with no errors - A sync finishes with errors due to the source - A sync finishes with errors due to the transformation However, when a destination has an error, at least sometimes it hangs forever. Each transformer is an implementation of a `grpc.BidiStreamingClient`, which uses these two methods: ```go Send(*Req) error Recv() (*Res, error) ``` Which in turn call these: ```go // SendMsg is generally called by generated code. On error, SendMsg aborts // the stream. If the error was generated by the client, the status is // returned directly; otherwise, io.EOF is returned and the status of // the stream may be discovered using RecvMsg. // // SendMsg blocks until: // - There is sufficient flow control to schedule m with the transport, or // - The stream is done, or // - The stream breaks. // // SendMsg does not wait until the message is received by the server. An // untimely stream closure may result in lost messages. To ensure delivery, // users should ensure the RPC completed successfully using RecvMsg. // // It is safe to have a goroutine calling SendMsg and another goroutine // calling RecvMsg on the same stream at the same time, but it is not safe // to call SendMsg on the same stream in different goroutines. It is also // not safe to call CloseSend concurrently with SendMsg. // // It is not safe to modify the message after calling SendMsg. Tracing // libraries and stats handlers may use the message lazily. SendMsg(m any) error // RecvMsg blocks until it receives a message into m or the stream is // done. It returns io.EOF when the stream completes successfully. On // any other error, the stream is aborted and the error contains the RPC // status. // // It is safe to have a goroutine calling SendMsg and another goroutine // calling RecvMsg on the same stream at the same time, but it is not // safe to call RecvMsg on the same stream in different goroutines. RecvMsg(m any) error ``` Both `SendMsg` and `RecvMsg` can hang forever, and this is what users are reporting. However we may want to mitigate this issue (let's say the flow of data or the orchestration for closing is wrong), it's probably gonna be fragile if we still rely on those methods not hanging. Thus, what I've done is running those two methods (`Recv()` and `Send()`) asynchronously. The Pipeline thread will invoke these, but it will exit if someone closes the pipeline (by using a `select`). I've tested running @disq 's reproduction test 100 times; it no longer hangs nor panics: #19130 (comment) **It correctly fails on a test error on insert configuration** ![Screenshot 2024-09-16 at 12 46 59](https://github.com/user-attachments/assets/6147c351-be56-4dc0-bc71-04d81f12db82) **It correctly succeeds on a success test configuration** ![Screenshot 2024-09-16 at 12 47 29](https://github.com/user-attachments/assets/5e5f0259-a972-49ff-8446-e0ed888de739) **It still doesn't swallow the last batch, as tested by running a stable sync multiple times and seeing the same resource count** ![Screenshot 2024-09-16 at 12 57 17](https://github.com/user-attachments/assets/73f65f27-1896-4333-8ba9-19d8d3ff5833)
1 parent cedf287 commit e151df2

File tree

1 file changed

+65
-26
lines changed

1 file changed

+65
-26
lines changed

cli/internal/transformerpipeline/pipeline.go

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66
"io"
7+
"sync/atomic"
8+
"time"
79

810
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
911
"golang.org/x/sync/errgroup"
@@ -69,59 +71,96 @@ func (lp *TransformerPipeline) Send(data []byte) error {
6971
return errors.New("OnOutput must be registered before Send is called, otherwise what do I do with the transformed data?")
7072
}
7173

72-
if lp.clientWrappers[0].isClosed {
73-
return errors.New("cannot send data to a closed pipeline")
74+
if lp.clientWrappers[0].isClosed.Load() {
75+
return nil
7476
}
7577

76-
return lp.clientWrappers[0].client.Send(&plugin.Transform_Request{Record: data})
78+
sendCh := make(chan error)
79+
80+
// Send can block forever (e.g. if grpc buffer is full), so we run it asynchronously
81+
// and check if pipeline is closed every second.
82+
go func() {
83+
err := lp.clientWrappers[0].client.Send(&plugin.Transform_Request{Record: data})
84+
sendCh <- err
85+
}()
86+
87+
select {
88+
case err := <-sendCh:
89+
return err
90+
case <-time.After(1 * time.Second): // Check if pipeline is closed every second
91+
if lp.clientWrappers[0].isClosed.Load() {
92+
return nil
93+
}
94+
}
95+
96+
return nil
7797
}
7898

7999
func (lp *TransformerPipeline) OnOutput(fn func([]byte) error) error {
80100
if fn == nil {
81101
return errors.New("argument to OnOutput cannot be nil")
82102
}
83103
lp.clientWrappers[len(lp.clientWrappers)-1].nextSendFn = func(req *plugin.Transform_Request) error {
84-
return fn(req.Record)
104+
err := fn(req.Record)
105+
if err != nil {
106+
// Our undocumented convention is that destination errors are unrecoverable. Thus, at this
107+
// point we close the pipeline.
108+
lp.Close()
109+
}
110+
return err
85111
}
86112
return nil
87113
}
88114

89115
func (lp *TransformerPipeline) Close() {
90-
// Closing the pipeline happens on both source as well as destination close.
91-
// Not handling this will result in a close of closed channel panic.
92-
if lp.clientWrappers[0].isClosed {
93-
return
94-
}
95-
96-
// Close the first transformer. The rest will follow gracefully, otherwise records will be lost.
97-
lp.clientWrappers[0].client.CloseSend()
98-
lp.clientWrappers[0].isClosed = true
116+
// Close() can happen in any goroutine, and closing is not thread safe.
117+
// Instead of closing, we set a flag that we check on send/recv.
118+
lp.clientWrappers[0].isClosed.Store(true)
99119
}
100120

101121
type clientWrapper struct {
102122
i int
103123
client plugin.Plugin_TransformClient
104124
nextSendFn func(*plugin.Transform_Request) error
105125
nextClose func() error
106-
isClosed bool
126+
isClosed atomic.Bool
107127
}
108128

109-
func (s clientWrapper) startBlocking() error {
129+
func (s *clientWrapper) startBlocking() error {
110130
if s.nextSendFn == nil {
111131
return errors.New("nextSendFn is nil")
112132
}
113-
for {
114-
data, err := s.client.Recv()
115-
if err == io.EOF {
116-
err := s.nextClose()
117-
return err
118-
}
119-
if err != nil {
120-
return err
133+
134+
recvCh := make(chan *plugin.Transform_Request)
135+
errCh := make(chan error)
136+
137+
// Recv can block forever (e.g. if transformer decides to), so
138+
// we run it asynchronously and check if pipeline is closed every second.
139+
go func() {
140+
for {
141+
data, err := s.client.Recv()
142+
if err != nil {
143+
errCh <- err
144+
} else {
145+
recvCh <- &plugin.Transform_Request{Record: data.Record}
146+
}
121147
}
122-
if err := s.nextSendFn(
123-
&plugin.Transform_Request{Record: data.Record},
124-
); err != nil {
148+
}()
149+
150+
for {
151+
select {
152+
case <-time.After(1 * time.Second): // Check if pipeline is closed every second
153+
if s.isClosed.Load() {
154+
return s.nextClose()
155+
}
156+
case req := <-recvCh: // Propagate records to next transformer
157+
if err := s.nextSendFn(req); err != nil {
158+
return err
159+
}
160+
case err := <-errCh:
161+
if err == io.EOF {
162+
return s.nextClose()
163+
}
125164
return err
126165
}
127166
}

0 commit comments

Comments
 (0)