Skip to content

Commit ed7feb0

Browse files
committed
sql: make sure that "inner" plans use the LeafTxn if the "outer" does
This commit fixes a bug where "inner" plans could incorrectly use the RootTxn when the "outer" plan used the LeafTxn. One example of such situation is when the "main" query is using the streamer (and thus is using the LeafTxn) and also has an apply join, but the apply join iteration plans would use the RootTxn. This could lead to "concurrent txn usage" detected on the RootTxn. This problem is fixed by auditing all code paths that might run plans that can spin up "inner" plans and plumbing the information that the LeafTxn must be used by those "inner" plans via the planner (we don't really have any other more convenient place to do that plumbing). Note that when create the flow for the main query we only know for sure whether it'll use the LeafTxn or not only after the flow setup is complete, so we adjust an existing `finishedSetupFn` callback to check the type of the txn that the flow ends up using and update the planner accordingly. This bug reliably reproduces when creating a materialized view, but for some (unknown to me) reason just running the query as is doesn't seem to trigger the bug (I tried stressing the query with no luck and decided it wasn't worth spending more time on it). I also believe that even though the underlying mechanism for the bug has been present since forever, it was really introduced only when we enabled the streamer by default in 22.2 (since without the streamer we always use the RootTxn for flows with apply joins or UDFs - they must be local). Release note (bug fix): CockroachDB previously could encounter "concurrent txn use detected" internal error in some rare cases, and this is now fixed. The bug was introduced in 22.2.0.
1 parent 0dd1e91 commit ed7feb0

14 files changed

Lines changed: 142 additions & 38 deletions

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ go_library(
8989
"//pkg/sql/execinfra",
9090
"//pkg/sql/execinfrapb",
9191
"//pkg/sql/exprutil",
92+
"//pkg/sql/flowinfra",
9293
"//pkg/sql/importer",
9394
"//pkg/sql/isql",
9495
"//pkg/sql/parser",

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2626
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2727
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
28+
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
2829
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2930
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
3031
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
@@ -301,7 +302,7 @@ func startDistChangefeed(
301302
)
302303
defer recv.Release()
303304

304-
var finishedSetupFn func()
305+
var finishedSetupFn func(flowinfra.Flow)
305306
if details.SinkURI != `` {
306307
// We abuse the job's results channel to make CREATE CHANGEFEED wait for
307308
// this before returning to the user to ensure the setup went okay. Job
@@ -310,7 +311,7 @@ func startDistChangefeed(
310311
// meaningful so that if we start doing anything with the results
311312
// returned by resumed jobs, then it breaks instead of returning
312313
// nonsense.
313-
finishedSetupFn = func() { resultsCh <- tree.Datums(nil) }
314+
finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) }
314315
}
315316

316317
// Copy the evalCtx, as dsp.Run() might change it.

pkg/sql/apply_join.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package sql
1313
import (
1414
"context"
1515
"strconv"
16+
"sync/atomic"
1617

1718
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
@@ -266,11 +267,12 @@ func runPlanInsidePlan(
266267
ctx context.Context, params runParams, plan *planComponents, resultWriter rowResultWriter,
267268
) error {
268269
defer plan.close(ctx)
270+
execCfg := params.ExecCfg()
269271
recv := MakeDistSQLReceiver(
270272
ctx, resultWriter, tree.Rows,
271-
params.ExecCfg().RangeDescriptorCache,
273+
execCfg.RangeDescriptorCache,
272274
params.p.Txn(),
273-
params.ExecCfg().Clock,
275+
execCfg.Clock,
274276
params.p.extendedEvalCtx.Tracing,
275277
)
276278
defer recv.Release()
@@ -301,14 +303,15 @@ func runPlanInsidePlan(
301303
// return from this method (after the main query is executed).
302304
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
303305
defer subqueryResultMemAcc.Close(ctx)
304-
if !params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRunSubqueries(
306+
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
305307
ctx,
306308
params.p,
307309
params.extendedEvalCtx.copy,
308310
plan.subqueryPlans,
309311
recv,
310312
&subqueryResultMemAcc,
311313
false, /* skipDistSQLDiagramGeneration */
314+
atomic.LoadUint32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1,
312315
) {
313316
return resultWriter.Err()
314317
}
@@ -325,15 +328,17 @@ func runPlanInsidePlan(
325328
if distributePlan.WillDistribute() {
326329
distributeType = DistributionTypeAlways
327330
}
328-
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx(
329-
ctx, evalCtx, &plannerCopy, params.p.txn, distributeType)
331+
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
330332
planCtx.planner.curPlan.planComponents = *plan
331333
planCtx.ExtendedEvalCtx.Planner = &plannerCopy
332334
planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy
333335
planCtx.stmtType = recv.stmtType
336+
planCtx.mustUseLeafTxn = atomic.LoadUint32(&params.p.atomic.innerPlansMustUseLeafTxn) == 1
334337

335-
params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun(
336-
ctx, evalCtx, planCtx, params.p.Txn(), plan.main, recv, nil, /* finishedSetupFn */
338+
finishedSetupFn, cleanup := getFinishedSetupFn(&plannerCopy)
339+
defer cleanup()
340+
execCfg.DistSQLPlanner.PlanAndRun(
341+
ctx, evalCtx, planCtx, plannerCopy.Txn(), plan.main, recv, finishedSetupFn,
337342
)
338343
return resultWriter.Err()
339344
}

pkg/sql/distsql/server.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,10 @@ type LocalState struct {
549549
// HasConcurrency indicates whether the local flow uses multiple goroutines.
550550
HasConcurrency bool
551551

552-
// ParallelCheck indicates whether the local flow is for a "check" postquery
553-
// that runs in parallel with other checks and, thus, the LeafTxn must be
554-
// used by this flow.
555-
ParallelCheck bool
552+
// MustUseLeaf indicates whether the local flow must use the LeafTxn even if
553+
// there is no concurrency in the flow on its own because there would be
554+
// concurrency with other flows which prohibits the usage of the RootTxn.
555+
MustUseLeaf bool
556556

557557
// Txn is filled in on the gateway only. It is the RootTxn that the query is running in.
558558
// This will be used directly by the flow if the flow has no concurrency and IsLocal is set.
@@ -567,7 +567,7 @@ type LocalState struct {
567567
// MustUseLeafTxn returns true if a LeafTxn must be used. It is valid to call
568568
// this method only after IsLocal and HasConcurrency have been set correctly.
569569
func (l LocalState) MustUseLeafTxn() bool {
570-
return !l.IsLocal || l.HasConcurrency || l.ParallelCheck
570+
return !l.IsLocal || l.HasConcurrency || l.MustUseLeaf
571571
}
572572

573573
// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node,

pkg/sql/distsql_physical_planner.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -841,12 +841,11 @@ type PlanningCtx struct {
841841
// query).
842842
subOrPostQuery bool
843843

844-
// parallelCheck, if set, indicates that this PlanningCtx is used to handle
845-
// one of the checkPlans that are run in parallel. As such, the DistSQL
846-
// planner will need to do a few adjustments like using the LeafTxn (even if
847-
// it's not needed based on other "regular" factors) and adding
848-
// synchronization between certain write operations.
849-
parallelCheck bool
844+
// mustUseLeafTxn, if set, indicates that this PlanningCtx is used to handle
845+
// one of the plans that will run in parallel with other plans. As such, the
846+
// DistSQL planner will need to use the LeafTxn (even if it's not needed
847+
// based on other "regular" factors).
848+
mustUseLeafTxn bool
850849

851850
// onFlowCleanup contains non-nil functions that will be called after the
852851
// local flow finished running and is being cleaned up. It allows us to

pkg/sql/distsql_plan_changefeed.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
2121
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
2222
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
23+
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
2324
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
2425
"github.com/cockroachdb/cockroach/pkg/sql/parser"
2526
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
@@ -206,7 +207,7 @@ func RunCDCEvaluation(
206207
// changes.
207208
cdcPlan.PlanCtx.usePlannerDescriptorsForLocalFlow = true
208209
p := cdcPlan.PlanCtx.planner
209-
finishedSetupFn := func() {
210+
finishedSetupFn := func(flowinfra.Flow) {
210211
p.Descriptors().ReleaseAll(ctx)
211212
}
212213

pkg/sql/distsql_plan_ctas.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,7 @@ func PlanAndRunCTAS(
5555
// Make copy of evalCtx as Run might modify it.
5656
evalCtxCopy := planner.ExtendedEvalContextCopy()
5757
FinalizePlan(ctx, planCtx, physPlan)
58-
dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtxCopy, nil /* finishedSetupFn */)
58+
finishedSetupFn, cleanup := getFinishedSetupFn(planner)
59+
defer cleanup()
60+
dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtxCopy, finishedSetupFn)
5961
}

pkg/sql/distsql_running.go

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,8 @@ const clientRejectedMsg string = "client rejected when attempting to run DistSQL
651651
// to be closed.
652652
//
653653
// Args:
654-
// - txn is the transaction in which the plan will run. If nil, the different
654+
// - txn is the root transaction in which the plan will run (or will be used to
655+
// derive leaf transactions if the plan has concurrency). If nil, then different
655656
// processors are expected to manage their own internal transactions.
656657
// - evalCtx is the evaluation context in which the plan will run. It might be
657658
// mutated.
@@ -664,7 +665,7 @@ func (dsp *DistSQLPlanner) Run(
664665
plan *PhysicalPlan,
665666
recv *DistSQLReceiver,
666667
evalCtx *extendedEvalContext,
667-
finishedSetupFn func(),
668+
finishedSetupFn func(localFlow flowinfra.Flow),
668669
) {
669670
flows := plan.GenerateFlowSpecs()
670671
gatewayFlowSpec, ok := flows[dsp.gatewaySQLInstanceID]
@@ -685,7 +686,7 @@ func (dsp *DistSQLPlanner) Run(
685686
// the line.
686687
localState.EvalContext = &evalCtx.Context
687688
localState.IsLocal = planCtx.isLocal
688-
localState.ParallelCheck = planCtx.parallelCheck
689+
localState.MustUseLeaf = planCtx.mustUseLeafTxn
689690
localState.Txn = txn
690691
localState.LocalProcs = plan.LocalProcessors
691692
// If we have access to a planner and are currently being used to plan
@@ -845,7 +846,7 @@ func (dsp *DistSQLPlanner) Run(
845846
}
846847

847848
if finishedSetupFn != nil {
848-
finishedSetupFn()
849+
finishedSetupFn(flow)
849850
}
850851

851852
// Check that flows that were forced to be planned locally and didn't need
@@ -1517,7 +1518,26 @@ func (r *DistSQLReceiver) ProducerDone() {
15171518
r.closed = true
15181519
}
15191520

1520-
// PlanAndRunAll combines running the the main query, subqueries and cascades/checks.
1521+
// getFinishedSetupFn returns a function to be passed into
1522+
// DistSQLPlanner.PlanAndRun or DistSQLPlanner.Run when running an "outer" plan
1523+
// that might create "inner" plans (e.g. apply join iterations). The returned
1524+
// function updates the passed-in planner to make sure that the "inner" plans
1525+
// use the LeafTxns if the "outer" plan happens to have concurrency. It also
1526+
// returns a non-nil cleanup function that must be called once all plans (the
1527+
// "outer" as well as all "inner" ones) are done.
1528+
func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow), cleanup func()) {
1529+
finishedSetupFn = func(localFlow flowinfra.Flow) {
1530+
if localFlow.GetFlowCtx().Txn.Type() == kv.LeafTxn {
1531+
atomic.StoreUint32(&planner.atomic.innerPlansMustUseLeafTxn, 1)
1532+
}
1533+
}
1534+
cleanup = func() {
1535+
atomic.StoreUint32(&planner.atomic.innerPlansMustUseLeafTxn, 0)
1536+
}
1537+
return finishedSetupFn, cleanup
1538+
}
1539+
1540+
// PlanAndRunAll combines running the main query, subqueries and cascades/checks.
15211541
// If an error is returned, the connection needs to stop processing queries.
15221542
// Query execution errors stored in recv; they are not returned.
15231543
func (dsp *DistSQLPlanner) PlanAndRunAll(
@@ -1544,15 +1564,20 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
15441564
&subqueryResultMemAcc,
15451565
// Skip the diagram generation since on this "main" query path we
15461566
// can get it via the statement bundle.
1547-
true, /* skipDistSQLDiagramGeneration */
1567+
true, /* skipDistSQLDiagramGeneration */
1568+
false, /* mustUseLeafTxn */
15481569
) {
15491570
return recv.commErr
15501571
}
15511572
}
15521573
recv.discardRows = planner.instrumentation.ShouldDiscardRows()
1553-
dsp.PlanAndRun(
1554-
ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, nil, /* finishedSetupFn */
1555-
)
1574+
func() {
1575+
finishedSetupFn, cleanup := getFinishedSetupFn(planner)
1576+
defer cleanup()
1577+
dsp.PlanAndRun(
1578+
ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, finishedSetupFn,
1579+
)
1580+
}()
15561581
if recv.commErr != nil || recv.getError() != nil {
15571582
return recv.commErr
15581583
}
@@ -1581,6 +1606,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
15811606
recv *DistSQLReceiver,
15821607
subqueryResultMemAcc *mon.BoundAccount,
15831608
skipDistSQLDiagramGeneration bool,
1609+
mustUseLeafTxn bool,
15841610
) bool {
15851611
for planIdx, subqueryPlan := range subqueryPlans {
15861612
if err := dsp.planAndRunSubquery(
@@ -1593,6 +1619,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
15931619
recv,
15941620
subqueryResultMemAcc,
15951621
skipDistSQLDiagramGeneration,
1622+
mustUseLeafTxn,
15961623
); err != nil {
15971624
recv.SetError(err)
15981625
return false
@@ -1616,6 +1643,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
16161643
recv *DistSQLReceiver,
16171644
subqueryResultMemAcc *mon.BoundAccount,
16181645
skipDistSQLDiagramGeneration bool,
1646+
mustUseLeafTxn bool,
16191647
) error {
16201648
subqueryMonitor := mon.NewMonitor(
16211649
"subquery",
@@ -1640,11 +1668,11 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
16401668
if distributeSubquery {
16411669
distribute = DistributionTypeAlways
16421670
}
1643-
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn,
1644-
distribute)
1671+
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
16451672
subqueryPlanCtx.stmtType = tree.Rows
16461673
subqueryPlanCtx.skipDistSQLDiagramGeneration = skipDistSQLDiagramGeneration
16471674
subqueryPlanCtx.subOrPostQuery = true
1675+
subqueryPlanCtx.mustUseLeafTxn = mustUseLeafTxn
16481676
if planner.instrumentation.ShouldSaveFlows() {
16491677
subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
16501678
}
@@ -1678,7 +1706,9 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
16781706
subqueryRowReceiver := NewRowResultWriter(&rows)
16791707
subqueryRecv.resultWriterMu.row = subqueryRowReceiver
16801708
subqueryPlans[planIdx].started = true
1681-
dsp.Run(ctx, subqueryPlanCtx, planner.txn, subqueryPhysPlan, subqueryRecv, evalCtx, nil /* finishedSetupFn */)
1709+
finishedSetupFn, cleanup := getFinishedSetupFn(planner)
1710+
defer cleanup()
1711+
dsp.Run(ctx, subqueryPlanCtx, planner.txn, subqueryPhysPlan, subqueryRecv, evalCtx, finishedSetupFn)
16821712
if err := subqueryRowReceiver.Err(); err != nil {
16831713
return err
16841714
}
@@ -1790,7 +1820,7 @@ func (dsp *DistSQLPlanner) PlanAndRun(
17901820
txn *kv.Txn,
17911821
plan planMaybePhysical,
17921822
recv *DistSQLReceiver,
1793-
finishedSetupFn func(),
1823+
finishedSetupFn func(localFlow flowinfra.Flow),
17941824
) {
17951825
log.VEventf(ctx, 2, "creating DistSQL plan with isLocal=%v", planCtx.isLocal)
17961826

@@ -2052,7 +2082,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
20522082
}
20532083
postqueryPlanCtx.associateNodeWithComponents = associateNodeWithComponents
20542084
postqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()
2055-
postqueryPlanCtx.parallelCheck = parallelCheck
2085+
postqueryPlanCtx.mustUseLeafTxn = parallelCheck
20562086

20572087
postqueryPhysPlan, physPlanCleanup, err := dsp.createPhysPlan(ctx, postqueryPlanCtx, postqueryPlan)
20582088
defer physPlanCleanup()
@@ -2067,6 +2097,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
20672097
postqueryResultWriter := &errOnlyResultWriter{}
20682098
postqueryRecv.resultWriterMu.row = postqueryResultWriter
20692099
postqueryRecv.resultWriterMu.batch = postqueryResultWriter
2100+
// Postqueries cannot have "inner" plans, so we use nil finishedSetupFn.
20702101
dsp.Run(ctx, postqueryPlanCtx, planner.txn, postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */)
20712102
return postqueryRecv.getError()
20722103
}

pkg/sql/importer/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ go_library(
7171
"//pkg/sql/execinfrapb",
7272
"//pkg/sql/exprutil",
7373
"//pkg/sql/faketreeeval",
74+
"//pkg/sql/flowinfra",
7475
"//pkg/sql/gcjob",
7576
"//pkg/sql/isql",
7677
"//pkg/sql/lexbase",
@@ -201,6 +202,7 @@ go_test(
201202
"//pkg/sql/distsql",
202203
"//pkg/sql/execinfra",
203204
"//pkg/sql/execinfrapb",
205+
"//pkg/sql/flowinfra",
204206
"//pkg/sql/gcjob",
205207
"//pkg/sql/isql",
206208
"//pkg/sql/parser",

pkg/sql/importer/import_job.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
4141
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
4242
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
43+
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
4344
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
4445
"github.com/cockroachdb/cockroach/pkg/sql/isql"
4546
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
@@ -65,7 +66,7 @@ const importJobRecoveryEventType eventpb.RecoveryEventType = "import_job"
6566
type importTestingKnobs struct {
6667
afterImport func(summary roachpb.RowCount) error
6768
beforeRunDSP func() error
68-
onSetupFinish func()
69+
onSetupFinish func(flowinfra.Flow)
6970
alwaysFlushJobProgress bool
7071
}
7172

0 commit comments

Comments
 (0)