Skip to content

fix: Fix transformers-related hang.#19165

Merged
marianogappa merged 5 commits intomainfrom
mariano/fix-tf-related-hang
Sep 16, 2024
Merged

fix: Fix transformers-related hang.#19165
marianogappa merged 5 commits intomainfrom
mariano/fix-tf-related-hang

Conversation

@marianogappa
Copy link
Copy Markdown
Contributor

@marianogappa marianogappa commented Sep 16, 2024

fixes https://github.com/cloudquery/cloudquery-issues/issues/2436

Several different users have reported that, since transformers have been implemented, certain syncs are hanging forever.

The transformations implementation seems to work well in these cases:

  • A sync completes with no errors
  • A sync finishes with errors due to the source
  • A sync finishes with errors due to the transformation

However, when a destination has an error, at least sometimes it hangs forever.

Each transformer is an implementation of a grpc.BidiStreamingClient, which uses these two methods:

	Send(*Req) error
	Recv() (*Res, error)

Which in turn call these:

	// SendMsg is generally called by generated code. On error, SendMsg aborts
	// the stream. If the error was generated by the client, the status is
	// returned directly; otherwise, io.EOF is returned and the status of
	// the stream may be discovered using RecvMsg.
	//
	// SendMsg blocks until:
	//   - There is sufficient flow control to schedule m with the transport, or
	//   - The stream is done, or
	//   - The stream breaks.
	//
	// SendMsg does not wait until the message is received by the server. An
	// untimely stream closure may result in lost messages. To ensure delivery,
	// users should ensure the RPC completed successfully using RecvMsg.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines. It is also
	// not safe to call CloseSend concurrently with SendMsg.
	//
	// It is not safe to modify the message after calling SendMsg. Tracing
	// libraries and stats handlers may use the message lazily.
	SendMsg(m any) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the stream completes successfully. On
	// any other error, the stream is aborted and the error contains the RPC
	// status.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m any) error

Both SendMsg and RecvMsg can hang forever, and this is what users are reporting.

However we may want to mitigate this issue (let's say the flow of data or the orchestration for closing is wrong), it's probably gonna be fragile if we still rely on those methods not hanging.

Thus, what I've done is running those two methods (Recv() and Send()) asynchronously. The Pipeline thread will invoke these, but it will exit if someone closes the pipeline (by using a select).

I've tested running @disq 's reproduction test 100 times; it no longer hangs nor panics:
#19130 (comment)

It correctly fails on a test error on insert configuration
Screenshot 2024-09-16 at 12 46 59

It correctly succeeds on a success test configuration
Screenshot 2024-09-16 at 12 47 29

It still doesn't swallow the last batch, as tested by running a stable sync multiple times and seeing the same resource count
Screenshot 2024-09-16 at 12 57 17

@marianogappa marianogappa requested review from a team, disq and erezrokah and removed request for a team September 16, 2024 12:13

// Send can block forever (e.g. if grpc buffer is full), so we run it asynchronously
// and check if pipeline is closed every second.
go func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
go func() {
go func() {
defer close(sendCh)

This way sendCh is almost always closed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of explicitly closing this channel? Btw, this goroutine is never explicitly exited either; the defer might as well be outside of it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the client.Send() method blocking ad infinitum (provided no error was encountered?) I thought (well, the comment above says...) it would only block if grpc buffer is full, or such cases.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing would be to stop having unused open channels (do they use file descriptor limits? probably not, but don't know)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case that we're hitting is that:

  • the source (via n workers) is sending non-stop
  • the destination was fine but then errors
  • the send doesn't get recv'd many times in a row until the buffer is full

But with my fix, within <= 1 second, the pipeline is closed.

This should be a single channel that is needlessly around for about 1 sec, so I think we're fine.

return err

recvCh := make(chan *plugin.Transform_Request)
errCh := make(chan error)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, ideally do a defer close(errCh) in the goroutine (but handle the closed case in the select below by using the val, ok := <-errCh syntax)

Copy link
Copy Markdown
Contributor Author

@marianogappa marianogappa Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question regarding this close. What's the benefit of closing it?

@marianogappa marianogappa merged commit e151df2 into main Sep 16, 2024
@marianogappa marianogappa deleted the mariano/fix-tf-related-hang branch September 16, 2024 14:40
kodiakhq bot pushed a commit that referenced this pull request Sep 16, 2024
🤖 I have created a release *beep* *boop*
---


## [6.7.0](cli-v6.6.3...cli-v6.7.0) (2024-09-16)


### Features

* Add `install` support for Transformer plugins ([#19166](#19166)) ([cc6ca4b](cc6ca4b))


### Bug Fixes

* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.62.0 ([#19137](#19137)) ([ed315d0](ed315d0))
* **deps:** Update module google.golang.org/grpc to v1.66.2 ([#19160](#19160)) ([8699db3](8699db3))
* Fix transformers-related hang. ([#19165](#19165)) ([e151df2](e151df2))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants