client: stream containers serially to conserve memory#12846
client: stream containers serially to conserve memory#12846ningmingxiao wants to merge 1 commit into
Conversation
ad0d6c5 to
e65d34f
Compare
074c60f to
9b5eedf
Compare
4a47c4b to
89bcba9
Compare
21641e1 to
d821378
Compare
|
Thoughts: Swapping speed for memory, and vice verse, is a time old issue best left to config option IMO. These types of performance changes are fought over with each performance change to the codebase with some contributors optimizing for speed, some for memory, some for a combination of with resource restrictions. Thus, this preference should not a this or that decision, it should be a choice, possibly even dynamic based on SLA definitions driven by the user/client. |
0ba7106 to
f7bb162
Compare
76c14a3 to
074c60f
Compare
2df43c6 to
dbe301f
Compare
dbe301f to
1fe488f
Compare
|
Here is the newest benchmark test test 10 times. if let b.Run("use client.Containers", benchmarkGetContainers(ctx, false, client)) first the first test use more resources on my computer(looks strange). let -benchtime=10s The difference is not as obvious as when tested with "time -v". |
1fe488f to
a56f712
Compare
|
done @mxpv |
|
ping @mxpv can this pr be merged? |
|
@mikebrow or @thaJeztah PTAL? |
|
ping @mikebrow @thaJeztah |
|
@thaJeztah can you take a look? |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a streaming container listing API (iterator-based) and wires it into ctr plus a benchmark to compare iterator vs slice listing performance under heavy spec/env metadata.
Changes:
- Introduces
Client.ContainersIterand a streaming iterator implementation backed by the remote container store. - Adds
ctr containers list --streamto print containers as they arrive instead of buffering the full list. - Adds an integration benchmark (
BenchmarkContainerGet) to compare allocation/runtime characteristics of iterator vs slice listing.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| integration/client/benchmark_test.go | Adds a benchmark that creates many containers with large envs and compares ContainersIter vs Containers. |
| cmd/ctr/commands/containers/containers.go | Adds --stream flag and updates listing to consume an iter.Seq2 source. |
| client/containerstore.go | Refactors streaming list logic into a reusable listIter sequence. |
| client/client.go | Exposes Client.ContainersIter that maps remote records to client.Container. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| session, err := r.client.ListStream(ctx, &containersapi.ListContainersRequest{ | ||
| Filters: filters, | ||
| }) | ||
| if err != nil { |
| func (c *Client) ContainersIter(ctx context.Context, filters ...string) (iter.Seq2[Container, error], error) { | ||
| r, ok := c.ContainerService().(*remoteContainers) | ||
| if !ok { | ||
| return nil, errgrpc.ToNative(fmt.Errorf("c.ContainerService() is not a *remoteContainers")) | ||
| } |
| var containers iter.Seq2[containerd.Container, error] | ||
| if stream { | ||
| containers, err = client.ContainersIter(ctx, filters...) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } else { | ||
| ctrs, err := client.Containers(ctx, filters...) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| containers = sliceIter(ctrs) | ||
| } | ||
| if err != nil { | ||
| return err | ||
| } |
| w := tabwriter.NewWriter(os.Stdout, 4, 8, 4, ' ', 0) | ||
| fmt.Fprintln(w, "CONTAINER\tIMAGE\tRUNTIME\t") | ||
| for _, c := range containers { | ||
|
|
||
| for c, err := range containers { | ||
| if err != nil { | ||
| return err | ||
| } | ||
| info, err := c.Info(ctx, containerd.WithoutRefreshedMetadata) | ||
| if err != nil { | ||
| return err |
| for i := 0; i < 500; i++ { | ||
| id := fmt.Sprintf("%s-%d", b.Name(), i) | ||
| container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), oci.WithEnv(envs), withExitStatus(7))) | ||
| if err != nil { | ||
| b.Error(err) | ||
| return | ||
| } | ||
| b.Logf("create container %s", container.ID()) | ||
| containers = append(containers, container) |
|
I change stream API name to ContainersIter can you take a look ? thanks ping @thaJeztah |
Co-authored-by: Sebastiaan van Stijn <github@gone.nl> Signed-off-by: ningmingxiao <ning.mingxiao@zte.com.cn>
|
|
||
| // ContainersIter streams containers from the underlying container store. | ||
| // This is only supported when the container service is backed by a remote store | ||
| // that implements ListIter (streaming). If not supported, it returns an error. |
| func (r *remoteContainers) listIter(ctx context.Context, filters ...string) iter.Seq2[containers.Container, error] { | ||
| return func(yield func(containers.Container, error) bool) { | ||
| session, err := r.client.ListStream(ctx, &containersapi.ListContainersRequest{ | ||
| Filters: filters, | ||
| }) | ||
| if err != nil { | ||
| yield(containers.Container{}, errgrpc.ToNative(err)) | ||
| return | ||
| } | ||
| for { | ||
| con, err := session.Recv() | ||
| if err != nil { | ||
| if err == io.EOF { | ||
| return | ||
| } | ||
| if s, ok := status.FromError(err); ok { | ||
| if s.Code() == codes.Unimplemented { | ||
| yield(containers.Container{}, errStreamNotAvailable) | ||
| return | ||
| } | ||
| } | ||
| yield(containers.Container{}, errgrpc.ToNative(err)) | ||
| return | ||
| } | ||
| if !yield(containerFromProto(con.Container), nil) { | ||
| return | ||
| } | ||
| return nil, errgrpc.ToNative(err) | ||
| } | ||
| select { | ||
| case <-ctx.Done(): | ||
| return containers, ctx.Err() | ||
| default: | ||
| containers = append(containers, containerFromProto(c.Container)) | ||
| } | ||
| } | ||
| } |
| func (c *Client) ContainersIter(ctx context.Context, filters ...string) (iter.Seq2[Container, error], error) { | ||
| r, ok := c.ContainerService().(*remoteContainers) | ||
| if !ok { | ||
| return nil, errgrpc.ToNative(fmt.Errorf("c.ContainerService() is not a *remoteContainers")) |
| b.ResetTimer() | ||
| b.ReportAllocs() | ||
| for i := 0; i < b.N; i++ { | ||
| var filter []string | ||
| var containers iter.Seq2[Container, error] | ||
| var err error | ||
| if useIter { | ||
| containers, err = client.ContainersIter(ctx, filter...) | ||
| if err != nil { | ||
| b.Fatal(err) | ||
| } | ||
| } else { | ||
| ctrs, err := client.Containers(ctx, filter...) | ||
| if err != nil { | ||
| b.Fatal(err) | ||
| } | ||
| containers = sliceIter(ctrs) | ||
| } | ||
| for con, err := range containers { |
| func sliceIter(ctrs []containerd.Container) iter.Seq2[containerd.Container, error] { | ||
| return func(yield func(containerd.Container, error) bool) { | ||
| for _, c := range ctrs { | ||
| if !yield(c, nil) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "iter" |
stream containers serially to conserve memory
ctr/nerdctl use
https://github.com/containerd/containerd/blob/v2.2.1/client/containerstore.go#L107 to get all containers list
if every container spec size is big we have to use much memory to store the container list.
what I did:
Process one item from the container at a time, without adding it to any list.
fix #12858
use /usr/bin/time -v ctr -n k8s.io c ls
after this pr
Maximum resident set size from 338328 reduce to 45556
@fuweid @mikebrow @mxpv @AkihiroSuda