-
Notifications
You must be signed in to change notification settings - Fork 4.1k
sql/rowflow: excessive memory allocation in setup flow #42770
Description
What is your situation?
Looking at a heap profile while running KV95, I observe that 16.12% of allocated space is due to rowflow.(*rowBasedFlow).Setup. There are few delinquent things here calls.
- The first is that we seem to fail to release the underlying
tableReaderinflow.Cleanup.
The tableReaders are allocated from a sync.Pool and should be released with Release:
cockroach/pkg/sql/rowexec/tablereader.go
Line 180 in 78f8482
| // Release releases this tableReader back to the pool. |
The flowinfra.FlowBase generally releases its underlying processors in Cleanup:
cockroach/pkg/sql/flowinfra/flow.go
Line 438 in 78f8482
| for _, p := range f.processors { |
The problem is that we remove the head processor from f.processors in FlowBase.Run:
cockroach/pkg/sql/flowinfra/flow.go
Line 363 in 78f8482
| f.processors = f.processors[:len(f.processors)-1] |
Unfortunately it's not clear that the head processor can be freed with the other processors. The following patch didn't seem to work:
index 88e86df63e..02fe9c8746 100644
--- a/pkg/sql/flowinfra/flow.go
+++ b/pkg/sql/flowinfra/flow.go
@@ -284,8 +284,9 @@ func (f *FlowBase) GetLocalProcessors() []execinfra.LocalProcessor {
// listen for a cancellation on the same context.
func (f *FlowBase) startInternal(ctx context.Context, doneFn func()) (context.Context, error) {
f.doneFn = doneFn
+ processors := f.processors[1:]
log.VEventf(
- ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
+ ctx, 1, "starting (%d processors, %d startables)", len(processors), len(f.startables),
)
ctx, f.ctxCancel = contextutil.WithCancel(ctx)
@@ -315,14 +316,14 @@ func (f *FlowBase) startInternal(ctx context.Context, doneFn func()) (context.Co
for _, s := range f.startables {
s.Start(ctx, &f.waitGroup, f.ctxCancel)
}
- for i := 0; i < len(f.processors); i++ {
+ for i := 0; i < len(processors); i++ {
f.waitGroup.Add(1)
go func(i int) {
- f.processors[i].Run(ctx)
+ processors[i].Run(ctx)
f.waitGroup.Done()
}(i)
}
- f.startedGoroutines = len(f.startables) > 0 || len(f.processors) > 0 || !f.IsLocal()
+ f.startedGoroutines = len(f.startables) > 0 || len(processors) > 0 || !f.IsLocal()
return ctx, nil
}
@@ -360,7 +361,6 @@ func (f *FlowBase) Run(ctx context.Context, doneFn func()) error {
return errors.AssertionFailedf("no processors in flow")
}
headProc = f.processors[len(f.processors)-1]
- f.processors = f.processors[:len(f.processors)-1]
var err error
if ctx, err = f.startInternal(ctx, doneFn); err != nil {-
We always construct a new
ImmutableTableDescriptor. These things are meant to be cached. I'm not sure how best to plumb a cache (or which one). -
We could pool the
rowBasedFlowobjects (right?).
Maybe there's something I'm missing about life cycles which make this pooling hard. Either way, it's the most obvious offender in a memory profile and it seems like mostly low hanging fruit.

