Skip to content

Commit ebf0eb8

Browse files
authored
fix: Fix potential deadlock in Snowflake destination (#5553)
(Similar to: #5550) If a worker returns an error, it will no longer be consuming from its `writeChan`, and in this case we should exit the loop early to avoid deadlock.
1 parent e087095 commit ebf0eb8

File tree

1 file changed

+14
-4
lines changed
  • plugins/destination/snowflake/client

1 file changed

+14
-4
lines changed

plugins/destination/snowflake/client/write.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (c *Client) writeResource(ctx context.Context, table *schema.Table, resourc
9191
}
9292

9393
func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *plugins.ClientResource) error {
94-
eg := errgroup.Group{}
94+
eg, gctx := errgroup.WithContext(ctx)
9595
workers := make(map[string]*worker, len(tables))
9696

9797
if _, err := c.db.ExecContext(ctx, createOrReplaceFileFormat); err != nil {
@@ -109,12 +109,22 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *pl
109109
writeChan: writeChan,
110110
}
111111
eg.Go(func() error {
112-
return c.writeResource(ctx, t, writeChan)
112+
return c.writeResource(gctx, t, writeChan)
113113
})
114114
}
115115

116-
for r := range res {
117-
workers[r.TableName].writeChan <- r.Data
116+
done := false
117+
for !done {
118+
select {
119+
case r, ok := <-res:
120+
if !ok {
121+
done = true
122+
break
123+
}
124+
workers[r.TableName].writeChan <- r.Data
125+
case <-gctx.Done():
126+
done = true
127+
}
118128
}
119129
for _, w := range workers {
120130
close(w.writeChan)

0 commit comments

Comments
 (0)