Skip to content

fix: Data race on transformers pipeline closed check#19130

Closed
disq wants to merge 1 commit intomainfrom
fix/transformer-pipeline-closed-race
Closed

fix: Data race on transformers pipeline closed check#19130
disq wants to merge 1 commit intomainfrom
fix/transformer-pipeline-closed-race

Conversation

@disq
Copy link
Copy Markdown
Member

@disq disq commented Sep 11, 2024

Fixes the data race on isClosed, which we introduced in #19105

2024-09-11T16:10:36Z INF top level table resolver started client=Client#4 invocation_id=0ed60c97-05cf-4365-9789-e9fe27561951 module=test-source table=test_paid_table
panic: send on closed channel

goroutine 45 [running]:
github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.(*identityTransformer).Send(0x14000a26a20?, 0x10580de10?)
        /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/identity.go:24 +0x28
github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.(*TransformerPipeline).Send(0x140000c4460, {0x14000a6ea80, 0x1d48, 0x2a80})
        /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/pipeline.go:76 +0xcc
github.com/cloudquery/cloudquery/cli/cmd.syncConnectionV3.func6()
        /Users/me/go/src/github.com/cloudquery/cq/cli/cmd/sync_v3.go:422 +0x96c
golang.org/x/sync/errgroup.(*Group).Go.func1()
        /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:78 +0x58
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 1
        /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:75 +0x98


==================
WARNING: DATA RACE
Read at 0x00c00057c868 by goroutine 71:
  github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.(*TransformerPipeline).Send()
      /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/pipeline.go:72 +0xb4
  github.com/cloudquery/cloudquery/cli/cmd.syncConnectionV3.func6()
      /Users/me/go/src/github.com/cloudquery/cq/cli/cmd/sync_v3.go:422 +0xcfc
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:78 +0x7c

Previous write at 0x00c00057c868 by goroutine 72:
  github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.(*TransformerPipeline).Close()
      /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/pipeline.go:98 +0x298
  github.com/cloudquery/cloudquery/cli/cmd.syncConnectionV3.func5()
      /Users/me/go/src/github.com/cloudquery/cq/cli/cmd/sync_v3.go:373 +0x204
  github.com/cloudquery/cloudquery/cli/cmd.syncConnectionV3.(*TransformerPipeline).OnOutput.func23()
      /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/pipeline.go:84 +0x5c
  github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.clientWrapper.startBlocking()
      /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/pipeline.go:122 +0x1d8
  github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.clientWrapper.startBlocking-fm()
      <autogenerated>:1 +0x60
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:78 +0x7c

Goroutine 71 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:75 +0x10c
  github.com/cloudquery/cloudquery/cli/cmd.syncConnectionV3()
      /Users/me/go/src/github.com/cloudquery/cq/cli/cmd/sync_v3.go:386 +0x3a68
  github.com/cloudquery/cloudquery/cli/cmd.sync()
      /Users/me/go/src/github.com/cloudquery/cq/cli/cmd/sync.go:377 +0x3d64
  github.com/spf13/cobra.(*Command).execute()
      /Users/me/go/pkg/mod/github.com/spf13/cobra@v1.8.1/command.go:985 +0xbc4
  github.com/spf13/cobra.(*Command).ExecuteC()
      /Users/me/go/pkg/mod/github.com/spf13/cobra@v1.8.1/command.go:1117 +0x4b8
  github.com/spf13/cobra.(*Command).Execute()
      /Users/me/go/pkg/mod/github.com/spf13/cobra@v1.8.1/command.go:1041 +0x74
  github.com/spf13/cobra.(*Command).ExecuteContext()
      /Users/me/go/pkg/mod/github.com/spf13/cobra@v1.8.1/command.go:1034 +0x18
  main.executeRootCmdWithContext()
      /Users/me/go/src/github.com/cloudquery/cq/cli/main.go:38 +0x2e0
  main.main()
      /Users/me/go/src/github.com/cloudquery/cq/cli/main.go:65 +0x130

Goroutine 72 (finished) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:75 +0x10c
  github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.(*TransformerPipeline).RunBlocking()
      /Users/me/go/src/github.com/cloudquery/cq/cli/internal/transformerpipeline/pipeline.go:61 +0x78
  github.com/cloudquery/cloudquery/cli/internal/transformerpipeline.(*TransformerPipeline).RunBlocking-fm()
      <autogenerated>:1 +0x34
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /Users/me/go/pkg/mod/golang.org/x/sync@v0.8.0/errgroup/errgroup.go:78 +0x7c
==================

@disq disq requested review from a team and erezrokah and removed request for a team September 11, 2024 16:26
@disq
Copy link
Copy Markdown
Member Author

disq commented Sep 11, 2024

Probably doesn't really matter because the whole isClosed check is not doing what we want it to do reliably.

@erezrokah
Copy link
Copy Markdown
Member

How can we reproduce the data race? Do we need to add a test for it?

@disq
Copy link
Copy Markdown
Member Author

disq commented Sep 13, 2024

@erezrokah Build the cli with go build -race . and then use this config:

kind: source
spec:
  name: test
  registry: cloudquery
  path: cloudquery/test
  version: v4.3.6
  tables: [ test_paid_table ]
  destinations: ["test"]
  spec:
    num_rows: 50000
    num_clients: 40
---
kind: destination
spec:
  name: test
  path: cloudquery/test
  registry: cloudquery
  version: v2.4.0
  spec:
    error_on_insert: true
    error_on_write: false
    error_on_migrate: false
    batch_writer: false

@marianogappa
Copy link
Copy Markdown
Contributor

Closing this in favor of #19165 cc @disq

@disq disq deleted the fix/transformer-pipeline-closed-race branch September 16, 2024 12:28
marianogappa added a commit that referenced this pull request Sep 16, 2024
fixes cloudquery/cloudquery-issues#2436

Several different users have reported that, since transformers have been
implemented, certain syncs are hanging forever.

The transformations implementation seems to work well in these cases:
- A sync completes with no errors
- A sync finishes with errors due to the source
- A sync finishes with errors due to the transformation

However, when a destination has an error, at least sometimes it hangs
forever.

Each transformer is an implementation of a `grpc.BidiStreamingClient`,
which uses these two methods:

```go
	Send(*Req) error
	Recv() (*Res, error)
```

Which in turn call these:
```go
	// SendMsg is generally called by generated code. On error, SendMsg aborts
	// the stream. If the error was generated by the client, the status is
	// returned directly; otherwise, io.EOF is returned and the status of
	// the stream may be discovered using RecvMsg.
	//
	// SendMsg blocks until:
	//   - There is sufficient flow control to schedule m with the transport, or
	//   - The stream is done, or
	//   - The stream breaks.
	//
	// SendMsg does not wait until the message is received by the server. An
	// untimely stream closure may result in lost messages. To ensure delivery,
	// users should ensure the RPC completed successfully using RecvMsg.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines. It is also
	// not safe to call CloseSend concurrently with SendMsg.
	//
	// It is not safe to modify the message after calling SendMsg. Tracing
	// libraries and stats handlers may use the message lazily.
	SendMsg(m any) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the stream completes successfully. On
	// any other error, the stream is aborted and the error contains the RPC
	// status.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m any) error
```

Both `SendMsg` and `RecvMsg` can hang forever, and this is what users
are reporting.

However we may want to mitigate this issue (let's say the flow of data
or the orchestration for closing is wrong), it's probably gonna be
fragile if we still rely on those methods not hanging.

Thus, what I've done is running those two methods (`Recv()` and
`Send()`) asynchronously. The Pipeline thread will invoke these, but it
will exit if someone closes the pipeline (by using a `select`).

I've tested running @disq 's reproduction test 100 times; it no longer
hangs nor panics:

#19130 (comment)

**It correctly fails on a test error on insert configuration**
![Screenshot 2024-09-16 at 12 46
59](https://github.com/user-attachments/assets/6147c351-be56-4dc0-bc71-04d81f12db82)

**It correctly succeeds on a success test configuration**
![Screenshot 2024-09-16 at 12 47
29](https://github.com/user-attachments/assets/5e5f0259-a972-49ff-8446-e0ed888de739)

**It still doesn't swallow the last batch, as tested by running a stable
sync multiple times and seeing the same resource count**
![Screenshot 2024-09-16 at 12 57
17](https://github.com/user-attachments/assets/73f65f27-1896-4333-8ba9-19d8d3ff5833)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants