sql: do not wait for setup of remote flows on the gateway#89649
sql: do not wait for setup of remote flows on the gateway#89649craig[bot] merged 3 commits intocockroachdb:masterfrom
Conversation
f4156d7 to
79308f8
Compare
963cd64 to
dd511ee
Compare
msirek
left a comment
There was a problem hiding this comment.
This looks great! Is it possible to add a test? Maybe a simple distributed query with a mode which randomly injects remote errors, then tests that everything was cleaned up properly. Or is that too difficult?
Reviewed 6 of 6 files at r1, 7 of 7 files at r2, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @yuzefovich)
pkg/sql/distsql_running.go line 102 at r2 (raw file):
// run executes the request. An error if encountered is both sent on the // result channel and returned.
nit: Please either add commas around 'if encountered' or re-word this.
pkg/sql/distsql_running.go line 452 at r2 (raw file):
err = planCtx.saveFlows(flows, opChains) } if len(flows) == 1 || err != nil {
Is it safe to assume there is always a local FlowSpec in flows?
pkg/sql/distsql_running.go line 523 at r2 (raw file):
}) _ = dsp.stopper.RunAsyncTask(origCtx, "distsql-remote-flows-setup-listener", func(ctx context.Context) { for i := 0; i < len(flows)-1; i++ {
This line makes the assumption that there is a local FlowSpec in flows. Is this always true? Could it ever contain zero local FlowSpecs?
Should we make it explicit and iterate through all flows and then ignore any local flows?
Also, looking at GenerateFlowSpecs(), it seems like each SQLInstanceID entry in the flow map could have multiple flows. If the goal is to execute this loop once for each remote flow, it seems like it should also iterate over all Processors in each remote FlowSpec.
pkg/sql/distsql_running.go line 706 at r2 (raw file):
} else { defer func() { if recv.getError() != nil {
Should the recv.resultWriterMu.batch interface also be extended with SetError and Err methods so this getError() check could also eagerly cancel flows when there's a batch error?
Code quote:
recvpkg/sql/distsql_running.go line 1111 at r2 (raw file):
// SQL node, which has left the handling up to the root // transaction. err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr)
If r.txn is RootTxn, this sets a new assertion error and loses the original error.
Should UpdateStateOnRemoteRetryableErr wrap the assertion error inside the original error to get more useful error reporting?
pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):
f.onFlowCleanup = func() { fn() oldOnFlowCleanup()
Should this add fn() after oldOnFlowCleanup() to make sure any cleanup operations in oldOnFlowCleanup() are done before fn() marks the cleanup as done?
dd511ee to
9af36b4
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Thanks for the review! I did briefly consider writing a test for this but decided not to since it seemed rather difficult and probably prone to flakes - but with your nudge I'll actually spend some time and see what I can come up with.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @msirek)
pkg/sql/distsql_running.go line 452 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
Is it safe to assume there is always a local
FlowSpecinflows?
Yes - we have an assertion that there is a gateway flow at the very beginning of this (setupFlows) function.
pkg/sql/distsql_running.go line 523 at r2 (raw file):
I think it's a pretty safe code given there is always the gateway flow and this code (iterating len(flows)-1 times to receive from resultChan) is not changing in this commit.
Also, looking at GenerateFlowSpecs(), it seems like each SQLInstanceID entry in the flow map could have multiple flows.
Hm, why does it seem like this? flows is a map from SQLInstanceID to a FlowSpec which implies that we have at most a single FlowSpec for any given SQLInstanceID. The iteration in GenerateFlowSpecs is about handling all processors - i.e. each node can have many processors planned - but there will be at most one flow per node.
We did discuss the idea of having more parallelism in our execution a couple of times, and one approach there would be to have multiple flows for a single node, but we haven't really done any work in that direction.
If the goal is to execute this loop once for each remote flow, it seems like it should also iterate over all Processors in each remote FlowSpec.
The goal of GenerateFlowSpecs is to iterate over all processors that are part of the PhysicalPlan, then divide all of them up according to the nodes on which they are planned, and each separate "division" (i.e. subset) of processors form a single FlowSpec.
pkg/sql/distsql_running.go line 706 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
Should the
recv.resultWriterMu.batchinterface also be extended withSetErrorandErrmethods so thisgetError()check could also eagerly cancel flows when there's a batch error?
It's not obvious, but recv.resultWriterMu.row and recv.resultWriterMu.batch (when the latter is non-nil) actually point to exactly the same object, so "implicitly" we do set the error on the correct object (most likely it'll be commandResult) in case of a batch error.
pkg/sql/distsql_running.go line 1111 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
If
r.txnisRootTxn, this sets a new assertion error and loses the original error.
ShouldUpdateStateOnRemoteRetryableErrwrap the assertion error inside the original error to get more useful error reporting?
I'm guessing no, but this code is not modified in this commit at all - it is only extracted into the helper, so I'm afraid to change anything here :)
pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
Should this add
fn()afteroldOnFlowCleanup()to make sure any cleanup operations inoldOnFlowCleanup()are done beforefn()marks the cleanup as done?
I think that we want the "stack" behavior here. My rationale is that we currently have a single onFlowCleanup function instantiated in distsql.ServerImpl.setupFlow, and I think that function should be executed last when we add new callbacks - this way the "parent" operation is cleaned up after the "child" one. I expanded the comment on the Flow interface.
This commit utilizes the existing `cancelFlowsCoordinator` as well as the corresponding `CancelDeadFlows` RPC to also eagerly cancel remote flows that are still running when the gateway flow encounters an error. Previously, this mechanism only worked for the remote flows that were in the queue on the remote nodes, but now that we removed the queueuing behavior in the flow scheduler, the mechanism became unused, and this commit repurposes it. This required minor changes to the remote flow runner to keep track of the `Flow` objects themselves (in addition to other info about the flows). This "dead flows" cancellation mechanism is not required for correctness (since the distributed cancellation relies on the proper shutdown via gRPC stream cancellation), but the justification for having it is two-fold: - it makes the cancellation more bullet-proof - in the follow-up commit we'll make it more likely that the query encounters an error during the setup time (by fully parallelizing the setup of all flows, including the gateway), and this eager cancellation seems better suited to handle errors in those cases. This commit also contains a minor fix to the remote flow runner constructor to correctly use the given memory account. This was an oversight not added in 0bbe280 because I incorrectly split out this commit into a separate one from that change. Release note: None
|
The test is actually uncovering some (relatively minor) issues that I want to address first, so hold off on reviewing further - I'll ping once it is ready for another look. |
This commit makes the server-side of `SetupFlow` RPC react better to the context cancellation. In particular, after setting up the flow on the remote node but before starting it in a new goroutine we now check whether the context of the RPC is canceled or not; if it is, then we don't actually need to run the flow at all, so we now short-circuit. This improves the shutdown protocol in some cases when the context is canceled on the gateway (client-side of the RPC) after we have dialed the node in `runnerRequest.run` but before the new goroutine handling the RPC on the client side has been spun up. This scenario is pretty rare currently but might occur more often due to the changes in the following commit. Note that there is no correctness issue here - even before this change, the flow would be shutdown properly (when the outbox would fail to execute `FlowStream` RPC due to the inbox having exited). Release note: None
4f637ab to
ae27c69
Compare
msirek
left a comment
There was a problem hiding this comment.
OK
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @yuzefovich)
pkg/sql/distsql_running.go line 452 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Yes - we have an assertion that there is a gateway flow at the very beginning of this (
setupFlows) function.
OK, thanks.
pkg/sql/distsql_running.go line 523 at r2 (raw file):
Hm, why does it seem like this?
I think this is misunderstanding of the code on my part. Thanks for the clarification.
We did discuss the idea of having more parallelism in our execution a couple of times, and one approach there would be to have multiple flows for a single node, but we haven't really done any work in that direction.
,,,
The goal of GenerateFlowSpecs is to iterate over all processors that are part of the PhysicalPlan, then divide all of them up according to the nodes on which they are planned, and each separate "division" (i.e. subset) of processors form a single FlowSpec.
TIL
Thanks
pkg/sql/distsql_running.go line 706 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
It's not obvious, but
recv.resultWriterMu.rowandrecv.resultWriterMu.batch(when the latter is non-nil) actually point to exactly the same object, so "implicitly" we do set the error on the correct object (most likely it'll becommandResult) in case of a batch error.
OK
pkg/sql/distsql_running.go line 1111 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I'm guessing no, but this code is not modified in this commit at all - it is only extracted into the helper, so I'm afraid to change anything here :)
OK, thanks.
pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think that we want the "stack" behavior here. My rationale is that we currently have a single
onFlowCleanupfunction instantiated indistsql.ServerImpl.setupFlow, and I think that function should be executed last when we add new callbacks - this way the "parent" operation is cleaned up after the "child" one. I expanded the comment on theFlowinterface.
OK
Previously, when setting up the flows for a distributed plan we would issue all SetupFlow RPCs to the remote nodes and wait for all of them to come back before proceeding with the execution of the local flow. This can introduce an execution stall, especially in multi-region setups, if remote flows depend on the local flow for some of the data. This was suboptimal, and this commit makes it so that we no longer wait for the RPCs to come back and start executing the local flow right away. We now spin up a separate goroutine that waits for the RPCs to come back, and if an error is encountered (which shouldn't happen often), then that goroutine sets the error on the DistSQLReceiver and cancels the local flow. Setting the error on the receiver, in turn, will make it so that all remote flows will be canceled via CancelDeadFlows RPC (which might be faster than via the distributed query shutdown triggered when the local flow is canceled). An additional change is that we now perform the setup of the local flow on the gateway first, and only if that is successful, we proceed to the setup of the remote flows. This acts as a sanity check on the validity of the flow spec and should make it less likely that the remote flows setup fails. This required some changes around the error handling of the DistSQLReceiver to make it concurrency safe. One option there was to make `SetError` and `Err` methods of `rowResultWriter` interface concurrency safe, but there are several implementations, and I decided to make the adjustment to the DistSQLReceiver itself since this concurrency safety is only needed there, and it seems somewhat wrong to impose the requirement on all of the implementations. Additionally, in order to avoid locking the mutex as much as possible, the `status` of the receiver is not protected by the mutex. This is achieved by the new goroutine not updating the status and, instead, letting the main goroutine "resolve" the status the next time a meta object is pushed. The idea is that the cancellation of the local flow shuts down the local flow's execution making it encounter an error which is then propagated as metadata. Thus, this "status resolution" should happen fairly quickly, and this setup allows us to avoid the locking in most scenarios when pushing rows and batches. Further changes were needed around `saveFlows` function as well as releasing flow specs back to the pool. The former had to be moved to be done sooner (right after setting up the local flow), and for the latter we had to move the release of the flow specs for the remote flows to right after the corresponding SetupFlow RPCs are issued. This ordering ensures that the flow specs are always released, but after all of their uses (both `saveFlows` and the RPCs use them). Yet another complication was around the concurrency between `Flow.Cleanup` being called and the new goroutine receiving an error from the RPC. At the very end of `Flow.Cleanup` the flow object is released, so the new goroutine cannot call `Flow.Cancel` after that. Additionally, since the mutex protection of the `rowResultWriter` by the DistSQLReceiver lasts only until `DistSQLPlanner.Run` returns (which is right after `Flow.Cleanup` returns), the new goroutine will attempt to set the error only if the cleanup hasn't been performed. This is achieved by having a mutex-protected boolean, and the boolean is only introduced if the new goroutine is spun up. Overall, as this whole comment suggests, it has been quite tricky to get things right (I hope I did), so one might wonder what simplifications, if any, could be made. I considered and (mostly) rejected several: - Ignoring the particular error that was returned by the RPCs and just canceling the local flow. This would allow us to remove the complexity of the concurrency safety with the error handling of the DistSQLReceiver. We would still properly shutdown the whole distributed plan. However, the downside is that we would lose the true reason for the shutdown - most likely we would return "context canceled" to the client instead of the actual error. On the other hand, the errors from the RPCs should be fairly rare that it might be worth giving this more thought. - Not canceling the local flow since just setting the error on the receiver would be sufficient for the query to eventually shutdown. The obvious downside is that we might do more work after having received an error from the RPC, and there is little upside I can see. - But the most interesting simplification would be to just not create the new goroutine in the first place. The idea is that if any of the SetupFlow RPCs fail, then the flows on other nodes would hit the "no inbound stream timeout" error (after 10s - by default - of not being connected to) which would start the shutdown of the whole plan. This idea would eliminate effectively all complexity of this commit which seems quite appealing. The downside here would be the combination of the downsides of the ideas above - namely, now the query would result in this opaque "no inbound stream" error (and we've been struggling with diagnosing those, so I'm not eager to introduce another way it could occur), and we would be doing wasteful work during this timeout window. Release note (performance improvement): The setup of the distributed query execution is now fully parallelized which should reduce the query latencies, especially in multi-region setups.
ae27c69 to
0c1095e
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Alright, I think it is RFAL.
I added a test like Mark suggested which shook out some issues:
- in a newly-added second commit we now make the cancellation of
SetupFlowRPC on the server-side (remote node) more eager. The test (when run with "succeeds within 5s") could have a remote flow onn2running even after the gateway flow onn1exited with an error injected onn3- this would depend on the scheduling of the RPC goroutines forn2. Unfortunately, in a rare edge case (like once in several thousands of runs under stress) it is expected that such a remote flow would exit after 10s timeout, so the test cannot use short 5s duration that I used when working on it for fleshing out things. - using the "origCtx" for the RPCs could lead to reuse of the tracing span that was
Finished if the RPC actually runs after the gateway flow exits. This is fixed by adjusting the last commit.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @msirek)
pkg/sql/distsql_running.go line 452 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
OK, thanks.
Done.
pkg/sql/distsql_running.go line 523 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
Hm, why does it seem like this?
I think this is misunderstanding of the code on my part. Thanks for the clarification.
We did discuss the idea of having more parallelism in our execution a couple of times, and one approach there would be to have multiple flows for a single node, but we haven't really done any work in that direction.
,,,
The goal of GenerateFlowSpecs is to iterate over all processors that are part of the PhysicalPlan, then divide all of them up according to the nodes on which they are planned, and each separate "division" (i.e. subset) of processors form a single FlowSpec.
TIL
Thanks
Done.
pkg/sql/distsql_running.go line 706 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
OK
Done.
pkg/sql/distsql_running.go line 1111 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
OK, thanks.
Done.
pkg/sql/flowinfra/flow.go line 526 at r2 (raw file):
Previously, msirek (Mark Sirek) wrote…
OK
Done.
msirek
left a comment
There was a problem hiding this comment.
Reviewed 7 of 7 files at r5, 1 of 1 files at r6, 8 of 9 files at r7, 1 of 1 files at r8, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @cucaroach)
|
TFTR! bors r+ |
|
Build succeeded: |
Previously, when setting up the flows for a distributed plan we would
issue all SetupFlow RPCs to the remote nodes and wait for all of them to
come back before proceeding with the execution of the local flow. This
can introduce an execution stall, especially in multi-region setups, if
remote flows depend on the local flow for some of the data. This was
suboptimal, and this PR makes it so that we no longer wait for the
RPCs to come back and start executing the local flow right away. We now
spin up a separate goroutine that waits for the RPCs to come back, and
if an error is encountered (which shouldn't happen often), then that
goroutine sets the error on the DistSQLReceiver and cancels the local
flow. Setting the error on the receiver, in turn, will make it so that
all remote flows will be canceled via CancelDeadFlows RPC (which might
be faster than via the distributed query shutdown triggered when the
local flow is canceled).
See individual commits for more details.
Fixes: #87669.
Epic: CRDB-20535
Release note (performance improvement): The setup of the distributed
query execution is now fully parallelized which should reduce the query
latencies, especially in multi-region setups.