Skip to content

sql/rowflow: excessive memory allocation in setup flow #42770

@ajwerner

Description

@ajwerner

What is your situation?

Looking at a heap profile while running KV95, I observe that 16.12% of allocated space is due to rowflow.(*rowBasedFlow).Setup. There are few delinquent things here calls.

Screenshot from 2019-11-25 22-13-18

  1. The first is that we seem to fail to release the underlying tableReader in flow.Cleanup.

The tableReaders are allocated from a sync.Pool and should be released with Release:

// Release releases this tableReader back to the pool.

The flowinfra.FlowBase generally releases its underlying processors in Cleanup:

for _, p := range f.processors {

The problem is that we remove the head processor from f.processors in FlowBase.Run:

f.processors = f.processors[:len(f.processors)-1]

Unfortunately it's not clear that the head processor can be freed with the other processors. The following patch didn't seem to work:

index 88e86df63e..02fe9c8746 100644
--- a/pkg/sql/flowinfra/flow.go
+++ b/pkg/sql/flowinfra/flow.go
@@ -284,8 +284,9 @@ func (f *FlowBase) GetLocalProcessors() []execinfra.LocalProcessor {
 // listen for a cancellation on the same context.
 func (f *FlowBase) startInternal(ctx context.Context, doneFn func()) (context.Context, error) {
        f.doneFn = doneFn
+       processors := f.processors[1:]
        log.VEventf(
-               ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
+               ctx, 1, "starting (%d processors, %d startables)", len(processors), len(f.startables),
        )

        ctx, f.ctxCancel = contextutil.WithCancel(ctx)
@@ -315,14 +316,14 @@ func (f *FlowBase) startInternal(ctx context.Context, doneFn func()) (context.Co
        for _, s := range f.startables {
                s.Start(ctx, &f.waitGroup, f.ctxCancel)
        }
-       for i := 0; i < len(f.processors); i++ {
+       for i := 0; i < len(processors); i++ {
                f.waitGroup.Add(1)
                go func(i int) {
-                       f.processors[i].Run(ctx)
+                       processors[i].Run(ctx)
                        f.waitGroup.Done()
                }(i)
        }
-       f.startedGoroutines = len(f.startables) > 0 || len(f.processors) > 0 || !f.IsLocal()
+       f.startedGoroutines = len(f.startables) > 0 || len(processors) > 0 || !f.IsLocal()
        return ctx, nil
 }

@@ -360,7 +361,6 @@ func (f *FlowBase) Run(ctx context.Context, doneFn func()) error {
                return errors.AssertionFailedf("no processors in flow")
        }
        headProc = f.processors[len(f.processors)-1]
-       f.processors = f.processors[:len(f.processors)-1]

        var err error
        if ctx, err = f.startInternal(ctx, doneFn); err != nil {
  1. We always construct a new ImmutableTableDescriptor. These things are meant to be cached. I'm not sure how best to plumb a cache (or which one).

  2. We could pool the rowBasedFlow objects (right?).

Maybe there's something I'm missing about life cycles which make this pooling hard. Either way, it's the most obvious offender in a memory profile and it seems like mostly low hanging fruit.

Screenshot from 2019-11-25 22-33-51

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions