Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions internal/servers/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error {
for {
r, err := msg.Recv()
if err != nil {
close(resources)
if err == io.EOF {
close(resources)
if err := eg.Wait(); err != nil {
return fmt.Errorf("got EOF. failed to wait for plugin: %w", err)
return fmt.Errorf("got EOF. plugin returned: %w", err)
Copy link
Copy Markdown
Member

@erezrokah erezrokah Dec 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EOF is not an error necessarily (it usually means the stream was closed), so I'm not sure we want to report it (we can log it instead as Debug). Also to be consistent we should add plugin returned to everywhere we do eg.Wait(), like lines 105 and 114 in this file.

You can see https://github.com/cloudquery/plugin-sdk/pull/461/files for reference.

We can also reduce the nesting of handling EOF, but that can be done in another PR, see #463 for reference

}
return msg.SendAndClose(&pb.Write2_Response{})
}
close(resources)
if err := eg.Wait(); err != nil {
s.Logger.Error().Err(err).Msg("got error. failed to wait for plugin")
if pluginErr := eg.Wait(); pluginErr != nil {
return fmt.Errorf("failed to receive msg: %v. plugin returned %w", err, pluginErr)
}
return fmt.Errorf("failed to receive msg: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions plugins/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,16 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou
select {
case <-gctx.Done():
close(ch)
return eg.Wait()
if err := eg.Wait(); err != nil {
return fmt.Errorf("context done %v. write client returned: %w", gctx.Err(), err)
}
case ch <- clientResource:
}
}

close(ch)
if err := eg.Wait(); err != nil {
return err
return fmt.Errorf("write client returned: %w", err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("write client returned: %w", err)
return fmt.Errorf("write client failed with error: %w", err)

Also shouldn't we use the same message above, i.e. instead plugin returned -> write failed

}
if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale {
if err := p.DeleteStale(ctx, tables, sourceName, syncTime); err != nil {
Expand Down