Skip to content

sql: do not wait for setup of remote flows on the gateway#89649

Merged
craig[bot] merged 3 commits intocockroachdb:masterfrom
yuzefovich:parallelize-setup
Nov 12, 2022
Merged

sql: do not wait for setup of remote flows on the gateway#89649
craig[bot] merged 3 commits intocockroachdb:masterfrom
yuzefovich:parallelize-setup

Conversation

@yuzefovich
Copy link
Copy Markdown
Member

@yuzefovich yuzefovich commented Oct 10, 2022

Previously, when setting up the flows for a distributed plan we would
issue all SetupFlow RPCs to the remote nodes and wait for all of them to
come back before proceeding with the execution of the local flow. This
can introduce an execution stall, especially in multi-region setups, if
remote flows depend on the local flow for some of the data. This was
suboptimal, and this PR makes it so that we no longer wait for the
RPCs to come back and start executing the local flow right away. We now
spin up a separate goroutine that waits for the RPCs to come back, and
if an error is encountered (which shouldn't happen often), then that
goroutine sets the error on the DistSQLReceiver and cancels the local
flow. Setting the error on the receiver, in turn, will make it so that
all remote flows will be canceled via CancelDeadFlows RPC (which might
be faster than via the distributed query shutdown triggered when the
local flow is canceled).

See individual commits for more details.

Fixes: #87669.

Epic: CRDB-20535

Release note (performance improvement): The setup of the distributed
query execution is now fully parallelized which should reduce the query
latencies, especially in multi-region setups.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the parallelize-setup branch 8 times, most recently from f4156d7 to 79308f8 Compare November 3, 2022 00:00
@yuzefovich yuzefovich force-pushed the parallelize-setup branch 10 times, most recently from 963cd64 to dd511ee Compare November 7, 2022 17:23
@yuzefovich yuzefovich marked this pull request as ready for review November 7, 2022 17:23
@yuzefovich yuzefovich requested a review from a team as a code owner November 7, 2022 17:23
Copy link
Copy Markdown
Contributor

@msirek msirek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Is it possible to add a test? Maybe a simple distributed query with a mode which randomly injects remote errors, then tests that everything was cleaned up properly. Or is that too difficult?

Reviewed 6 of 6 files at r1, 7 of 7 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @yuzefovich)


pkg/sql/distsql_running.go line 102 at r2 (raw file):

// run executes the request. An error if encountered is both sent on the
// result channel and returned.

nit: Please either add commas around 'if encountered' or re-word this.


pkg/sql/distsql_running.go line 452 at r2 (raw file):

		err = planCtx.saveFlows(flows, opChains)
	}
	if len(flows) == 1 || err != nil {

Is it safe to assume there is always a local FlowSpec in flows?


pkg/sql/distsql_running.go line 523 at r2 (raw file):

	})
	_ = dsp.stopper.RunAsyncTask(origCtx, "distsql-remote-flows-setup-listener", func(ctx context.Context) {
		for i := 0; i < len(flows)-1; i++ {

This line makes the assumption that there is a local FlowSpec in flows. Is this always true? Could it ever contain zero local FlowSpecs?
Should we make it explicit and iterate through all flows and then ignore any local flows?

Also, looking at GenerateFlowSpecs(), it seems like each SQLInstanceID entry in the flow map could have multiple flows. If the goal is to execute this loop once for each remote flow, it seems like it should also iterate over all Processors in each remote FlowSpec.


pkg/sql/distsql_running.go line 706 at r2 (raw file):

	} else {
		defer func() {
			if recv.getError() != nil {

Should the recv.resultWriterMu.batch interface also be extended with SetError and Err methods so this getError() check could also eagerly cancel flows when there's a batch error?

Code quote:

recv

pkg/sql/distsql_running.go line 1111 at r2 (raw file):

				// SQL node, which has left the handling up to the root
				// transaction.
				err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr)

If r.txn is RootTxn, this sets a new assertion error and loses the original error.
Should UpdateStateOnRemoteRetryableErr wrap the assertion error inside the original error to get more useful error reporting?


pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):

		f.onFlowCleanup = func() {
			fn()
			oldOnFlowCleanup()

Should this add fn() after oldOnFlowCleanup() to make sure any cleanup operations in oldOnFlowCleanup() are done before fn() marks the cleanup as done?

Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I did briefly consider writing a test for this but decided not to since it seemed rather difficult and probably prone to flakes - but with your nudge I'll actually spend some time and see what I can come up with.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @msirek)


pkg/sql/distsql_running.go line 452 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

Is it safe to assume there is always a local FlowSpec in flows?

Yes - we have an assertion that there is a gateway flow at the very beginning of this (setupFlows) function.


pkg/sql/distsql_running.go line 523 at r2 (raw file):
I think it's a pretty safe code given there is always the gateway flow and this code (iterating len(flows)-1 times to receive from resultChan) is not changing in this commit.

Also, looking at GenerateFlowSpecs(), it seems like each SQLInstanceID entry in the flow map could have multiple flows.

Hm, why does it seem like this? flows is a map from SQLInstanceID to a FlowSpec which implies that we have at most a single FlowSpec for any given SQLInstanceID. The iteration in GenerateFlowSpecs is about handling all processors - i.e. each node can have many processors planned - but there will be at most one flow per node.

We did discuss the idea of having more parallelism in our execution a couple of times, and one approach there would be to have multiple flows for a single node, but we haven't really done any work in that direction.

If the goal is to execute this loop once for each remote flow, it seems like it should also iterate over all Processors in each remote FlowSpec.

The goal of GenerateFlowSpecs is to iterate over all processors that are part of the PhysicalPlan, then divide all of them up according to the nodes on which they are planned, and each separate "division" (i.e. subset) of processors form a single FlowSpec.


pkg/sql/distsql_running.go line 706 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

Should the recv.resultWriterMu.batch interface also be extended with SetError and Err methods so this getError() check could also eagerly cancel flows when there's a batch error?

It's not obvious, but recv.resultWriterMu.row and recv.resultWriterMu.batch (when the latter is non-nil) actually point to exactly the same object, so "implicitly" we do set the error on the correct object (most likely it'll be commandResult) in case of a batch error.


pkg/sql/distsql_running.go line 1111 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

If r.txn is RootTxn, this sets a new assertion error and loses the original error.
Should UpdateStateOnRemoteRetryableErr wrap the assertion error inside the original error to get more useful error reporting?

I'm guessing no, but this code is not modified in this commit at all - it is only extracted into the helper, so I'm afraid to change anything here :)


pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

Should this add fn() after oldOnFlowCleanup() to make sure any cleanup operations in oldOnFlowCleanup() are done before fn() marks the cleanup as done?

I think that we want the "stack" behavior here. My rationale is that we currently have a single onFlowCleanup function instantiated in distsql.ServerImpl.setupFlow, and I think that function should be executed last when we add new callbacks - this way the "parent" operation is cleaned up after the "child" one. I expanded the comment on the Flow interface.

This commit utilizes the existing `cancelFlowsCoordinator` as well as
the corresponding `CancelDeadFlows` RPC to also eagerly cancel remote
flows that are still running when the gateway flow encounters an error.
Previously, this mechanism only worked for the remote flows that were in
the queue on the remote nodes, but now that we removed the queueuing
behavior in the flow scheduler, the mechanism became unused, and this
commit repurposes it. This required minor changes to the remote flow
runner to keep track of the `Flow` objects themselves (in addition to
other info about the flows).

This "dead flows" cancellation mechanism is not required for
correctness (since the distributed cancellation relies on the proper
shutdown via gRPC stream cancellation), but the justification for having
it is two-fold:
- it makes the cancellation more bullet-proof
- in the follow-up commit we'll make it more likely that the query
encounters an error during the setup time (by fully parallelizing the
setup of all flows, including the gateway), and this eager cancellation
seems better suited to handle errors in those cases.

This commit also contains a minor fix to the remote flow runner
constructor to correctly use the given memory account. This was an
oversight not added in 0bbe280 because
I incorrectly split out this commit into a separate one from that
change.

Release note: None
@yuzefovich
Copy link
Copy Markdown
Member Author

The test is actually uncovering some (relatively minor) issues that I want to address first, so hold off on reviewing further - I'll ping once it is ready for another look.

This commit makes the server-side of `SetupFlow` RPC react better to the
context cancellation. In particular, after setting up the flow on the
remote node but before starting it in a new goroutine we now check
whether the context of the RPC is canceled or not; if it is, then we
don't actually need to run the flow at all, so we now short-circuit.

This improves the shutdown protocol in some cases when the context is
canceled on the gateway (client-side of the RPC) after we have dialed
the node in `runnerRequest.run` but before the new goroutine handling
the RPC on the client side has been spun up. This scenario is pretty
rare currently but might occur more often due to the changes in the
following commit. Note that there is no correctness issue here - even
before this change, the flow would be shutdown properly (when the outbox
would fail to execute `FlowStream` RPC due to the inbox having exited).

Release note: None
@yuzefovich yuzefovich force-pushed the parallelize-setup branch 2 times, most recently from 4f637ab to ae27c69 Compare November 11, 2022 05:03
Copy link
Copy Markdown
Contributor

@msirek msirek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @yuzefovich)


pkg/sql/distsql_running.go line 452 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Yes - we have an assertion that there is a gateway flow at the very beginning of this (setupFlows) function.

OK, thanks.


pkg/sql/distsql_running.go line 523 at r2 (raw file):

Hm, why does it seem like this?

I think this is misunderstanding of the code on my part. Thanks for the clarification.

We did discuss the idea of having more parallelism in our execution a couple of times, and one approach there would be to have multiple flows for a single node, but we haven't really done any work in that direction.

,,,

The goal of GenerateFlowSpecs is to iterate over all processors that are part of the PhysicalPlan, then divide all of them up according to the nodes on which they are planned, and each separate "division" (i.e. subset) of processors form a single FlowSpec.

TIL
Thanks


pkg/sql/distsql_running.go line 706 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

It's not obvious, but recv.resultWriterMu.row and recv.resultWriterMu.batch (when the latter is non-nil) actually point to exactly the same object, so "implicitly" we do set the error on the correct object (most likely it'll be commandResult) in case of a batch error.

OK


pkg/sql/distsql_running.go line 1111 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I'm guessing no, but this code is not modified in this commit at all - it is only extracted into the helper, so I'm afraid to change anything here :)

OK, thanks.


pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think that we want the "stack" behavior here. My rationale is that we currently have a single onFlowCleanup function instantiated in distsql.ServerImpl.setupFlow, and I think that function should be executed last when we add new callbacks - this way the "parent" operation is cleaned up after the "child" one. I expanded the comment on the Flow interface.

OK

Previously, when setting up the flows for a distributed plan we would
issue all SetupFlow RPCs to the remote nodes and wait for all of them to
come back before proceeding with the execution of the local flow. This
can introduce an execution stall, especially in multi-region setups, if
remote flows depend on the local flow for some of the data. This was
suboptimal, and this commit makes it so that we no longer wait for the
RPCs to come back and start executing the local flow right away. We now
spin up a separate goroutine that waits for the RPCs to come back, and
if an error is encountered (which shouldn't happen often), then that
goroutine sets the error on the DistSQLReceiver and cancels the local
flow. Setting the error on the receiver, in turn, will make it so that
all remote flows will be canceled via CancelDeadFlows RPC (which might
be faster than via the distributed query shutdown triggered when the
local flow is canceled).

An additional change is that we now perform the setup of the local flow
on the gateway first, and only if that is successful, we proceed to the
setup of the remote flows. This acts as a sanity check on the validity
of the flow spec and should make it less likely that the remote flows
setup fails.

This required some changes around the error handling of the
DistSQLReceiver to make it concurrency safe. One option there was to make
`SetError` and `Err` methods of `rowResultWriter` interface concurrency
safe, but there are several implementations, and I decided to make the
adjustment to the DistSQLReceiver itself since this concurrency safety is
only needed there, and it seems somewhat wrong to impose the requirement
on all of the implementations.

Additionally, in order to avoid locking the mutex as much as possible,
the `status` of the receiver is not protected by the mutex. This is
achieved by the new goroutine not updating the status and, instead,
letting the main goroutine "resolve" the status the next time a meta
object is pushed. The idea is that the cancellation of the local flow
shuts down the local flow's execution making it encounter an error which
is then propagated as metadata. Thus, this "status resolution" should
happen fairly quickly, and this setup allows us to avoid the locking in
most scenarios when pushing rows and batches.

Further changes were needed around `saveFlows` function as well as
releasing flow specs back to the pool. The former had to be moved to be
done sooner (right after setting up the local flow), and for the latter
we had to move the release of the flow specs for the remote flows to
right after the corresponding SetupFlow RPCs are issued. This ordering
ensures that the flow specs are always released, but after all of their
uses (both `saveFlows` and the RPCs use them).

Yet another complication was around the concurrency between
`Flow.Cleanup` being called and the new goroutine receiving an error
from the RPC. At the very end of `Flow.Cleanup` the flow object is
released, so the new goroutine cannot call `Flow.Cancel` after that.
Additionally, since the mutex protection of the `rowResultWriter` by the
DistSQLReceiver lasts only until `DistSQLPlanner.Run` returns (which is
right after `Flow.Cleanup` returns), the new goroutine will attempt to
set the error only if the cleanup hasn't been performed. This is
achieved by having a mutex-protected boolean, and the boolean is only
introduced if the new goroutine is spun up.

Overall, as this whole comment suggests, it has been quite tricky to get
things right (I hope I did), so one might wonder what simplifications,
if any, could be made. I considered and (mostly) rejected several:
- Ignoring the particular error that was returned by the RPCs and just
canceling the local flow. This would allow us to remove the complexity
of the concurrency safety with the error handling of the
DistSQLReceiver. We would still properly shutdown the whole distributed
plan. However, the downside is that we would lose the true reason for
the shutdown - most likely we would return "context canceled" to the
client instead of the actual error. On the other hand, the errors from
the RPCs should be fairly rare that it might be worth giving this more
thought.
- Not canceling the local flow since just setting the error on the
receiver would be sufficient for the query to eventually shutdown. The
obvious downside is that we might do more work after having received an
error from the RPC, and there is little upside I can see.
- But the most interesting simplification would be to just not create
the new goroutine in the first place. The idea is that if any of the
SetupFlow RPCs fail, then the flows on other nodes would hit the "no
inbound stream timeout" error (after 10s - by default - of not being
connected to) which would start the shutdown of the whole plan. This
idea would eliminate effectively all complexity of this commit which
seems quite appealing. The downside here would be the combination of the
downsides of the ideas above - namely, now the query would result in
this opaque "no inbound stream" error (and we've been struggling with
diagnosing those, so I'm not eager to introduce another way it could
occur), and we would be doing wasteful work during this timeout window.

Release note (performance improvement): The setup of the distributed
query execution is now fully parallelized which should reduce the query
latencies, especially in multi-region setups.
Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I think it is RFAL.

I added a test like Mark suggested which shook out some issues:

  • in a newly-added second commit we now make the cancellation of SetupFlow RPC on the server-side (remote node) more eager. The test (when run with "succeeds within 5s") could have a remote flow on n2 running even after the gateway flow on n1 exited with an error injected on n3 - this would depend on the scheduling of the RPC goroutines for n2. Unfortunately, in a rare edge case (like once in several thousands of runs under stress) it is expected that such a remote flow would exit after 10s timeout, so the test cannot use short 5s duration that I used when working on it for fleshing out things.
  • using the "origCtx" for the RPCs could lead to reuse of the tracing span that was Finished if the RPC actually runs after the gateway flow exits. This is fixed by adjusting the last commit.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @msirek)


pkg/sql/distsql_running.go line 452 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

OK, thanks.

Done.


pkg/sql/distsql_running.go line 523 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

Hm, why does it seem like this?

I think this is misunderstanding of the code on my part. Thanks for the clarification.

We did discuss the idea of having more parallelism in our execution a couple of times, and one approach there would be to have multiple flows for a single node, but we haven't really done any work in that direction.

,,,

The goal of GenerateFlowSpecs is to iterate over all processors that are part of the PhysicalPlan, then divide all of them up according to the nodes on which they are planned, and each separate "division" (i.e. subset) of processors form a single FlowSpec.

TIL
Thanks

Done.


pkg/sql/distsql_running.go line 706 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

OK

Done.


pkg/sql/distsql_running.go line 1111 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

OK, thanks.

Done.


pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):

Previously, msirek (Mark Sirek) wrote…

OK

Done.

Copy link
Copy Markdown
Contributor

@msirek msirek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 7 of 7 files at r5, 1 of 1 files at r6, 8 of 9 files at r7, 1 of 1 files at r8, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @cucaroach)

@yuzefovich
Copy link
Copy Markdown
Member Author

TFTR!

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Nov 12, 2022

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

distsql: consider not blocking on SetupFlow RPCs before starting the flow on the gateway

3 participants