Skip to content

Commit f2dd085

Browse files
smiralaurazard
andcommitted
feat: report image pull progress in the console
This reports image pull progress in the console for images pulled by Talos: * etcd * kubelet * installer This work was mostly done by @laurazard, I just wrapped it for the console with Laura's help. (see #12932) Co-authored-by: Laura Brehm <laurabrehm@hey.com> Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
1 parent 72fe98a commit f2dd085

File tree

7 files changed

+502
-7
lines changed

7 files changed

+502
-7
lines changed

internal/app/machined/pkg/system/services/etcd.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner/containerd"
3939
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner/restart"
4040
"github.com/siderolabs/talos/internal/pkg/containers/image"
41+
"github.com/siderolabs/talos/internal/pkg/containers/image/console"
4142
"github.com/siderolabs/talos/internal/pkg/environment"
4243
"github.com/siderolabs/talos/internal/pkg/etcd"
4344
"github.com/siderolabs/talos/pkg/argsbuilder"
@@ -101,7 +102,10 @@ func (e *Etcd) PreFunc(ctx context.Context, r runtime.Runtime) error {
101102
return fmt.Errorf("failed to get etcd spec: %w", err)
102103
}
103104

104-
img, err := image.Pull(containerdctx, cri.RegistryBuilder(r.State().V1Alpha2().Resources()), client, spec.TypedSpec().Image, image.WithSkipIfAlreadyPulled())
105+
img, err := image.Pull(containerdctx, cri.RegistryBuilder(r.State().V1Alpha2().Resources()), client, spec.TypedSpec().Image,
106+
image.WithSkipIfAlreadyPulled(),
107+
image.WithProgressReporter(console.NewProgressReporter),
108+
)
105109
if err != nil {
106110
return fmt.Errorf("failed to pull image %q: %w", spec.TypedSpec().Image, err)
107111
}

internal/app/machined/pkg/system/services/kubelet.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner/restart"
3030
"github.com/siderolabs/talos/internal/pkg/capability"
3131
"github.com/siderolabs/talos/internal/pkg/containers/image"
32+
"github.com/siderolabs/talos/internal/pkg/containers/image/console"
3233
"github.com/siderolabs/talos/internal/pkg/environment"
3334
"github.com/siderolabs/talos/pkg/conditions"
3435
"github.com/siderolabs/talos/pkg/machinery/config/machine"
@@ -71,7 +72,10 @@ func (k *Kubelet) PreFunc(ctx context.Context, r runtime.Runtime) error {
7172
// Pull the image and unpack it.
7273
containerdctx := namespaces.WithNamespace(ctx, constants.SystemContainerdNamespace)
7374

74-
img, err := image.Pull(containerdctx, cri.RegistryBuilder(r.State().V1Alpha2().Resources()), client, spec.Image, image.WithSkipIfAlreadyPulled())
75+
img, err := image.Pull(containerdctx, cri.RegistryBuilder(r.State().V1Alpha2().Resources()), client, spec.Image,
76+
image.WithSkipIfAlreadyPulled(),
77+
image.WithProgressReporter(console.NewProgressReporter),
78+
)
7579
if err != nil {
7680
return err
7781
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
// Package console provides a console-based implementation of image pull progress reporting.
6+
package console
7+
8+
import (
9+
"log"
10+
"sync"
11+
"time"
12+
13+
"github.com/dustin/go-humanize"
14+
15+
"github.com/siderolabs/talos/internal/pkg/containers/image"
16+
"github.com/siderolabs/talos/internal/pkg/containers/image/progress"
17+
)
18+
19+
// ReportInterval is the interval between progress reports.
20+
const ReportInterval = 15 * time.Second
21+
22+
type layerProgress struct {
23+
status progress.LayerPullStatus
24+
offset int64
25+
total int64
26+
}
27+
28+
// ProgressReporter reports image pull progress to the console.
29+
type ProgressReporter struct {
30+
imageRef string
31+
32+
mu sync.Mutex
33+
layers map[string]*layerProgress
34+
stopCh chan struct{}
35+
}
36+
37+
// NewProgressReporter creates a new ProgressReporter.
38+
func NewProgressReporter(imageRef string) image.ProgressReporter {
39+
return &ProgressReporter{
40+
imageRef: imageRef,
41+
}
42+
}
43+
44+
// Update implements UpdateFn interface.
45+
func (c *ProgressReporter) Update(upd progress.LayerPullProgress) {
46+
c.mu.Lock()
47+
defer c.mu.Unlock()
48+
49+
if c.layers == nil {
50+
c.layers = make(map[string]*layerProgress)
51+
}
52+
53+
lp, ok := c.layers[upd.LayerID]
54+
if !ok {
55+
lp = &layerProgress{}
56+
c.layers[upd.LayerID] = lp
57+
}
58+
59+
if upd.Status == progress.LayerPullStatusDownloading {
60+
lp.total = upd.Total
61+
lp.offset = upd.Offset
62+
} else {
63+
lp.offset = lp.total
64+
}
65+
66+
lp.status = upd.Status
67+
}
68+
69+
// Start implements ProgressReporter interface.
70+
func (c *ProgressReporter) Start() {
71+
c.stopCh = make(chan struct{})
72+
73+
go func() {
74+
ticker := time.NewTicker(ReportInterval)
75+
defer ticker.Stop()
76+
77+
c.reportProgress()
78+
79+
for {
80+
select {
81+
case <-ticker.C:
82+
c.reportProgress()
83+
case <-c.stopCh:
84+
return
85+
}
86+
}
87+
}()
88+
}
89+
90+
// Stop implements ProgressReporter interface.
91+
func (c *ProgressReporter) Stop() {
92+
close(c.stopCh)
93+
}
94+
95+
func (c *ProgressReporter) reportProgress() {
96+
c.mu.Lock()
97+
defer c.mu.Unlock()
98+
99+
if len(c.layers) == 0 {
100+
log.Printf("pulling image %s: starting...", c.imageRef)
101+
102+
return
103+
}
104+
105+
var (
106+
anyDownloading bool
107+
overallOffset int64
108+
overallTotal int64
109+
)
110+
111+
for _, l := range c.layers {
112+
if l.status == progress.LayerPullStatusDownloading {
113+
anyDownloading = true
114+
}
115+
116+
overallOffset += l.offset
117+
overallTotal += l.total
118+
}
119+
120+
if !anyDownloading {
121+
log.Printf("pulling image %s: extracting...", c.imageRef)
122+
123+
return
124+
}
125+
126+
var percentage float64
127+
128+
if overallTotal > 0 {
129+
percentage = float64(overallOffset) / float64(overallTotal) * 100.0
130+
}
131+
132+
log.Printf("pulling image %s: downloading %d layers (%s/%s) (%.2f%%)...",
133+
c.imageRef, len(c.layers),
134+
humanize.IBytes(uint64(overallOffset)),
135+
humanize.IBytes(uint64(overallTotal)),
136+
percentage,
137+
)
138+
}

internal/pkg/containers/image/image.go

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
containerd "github.com/containerd/containerd/v2/client"
1515
"github.com/containerd/containerd/v2/core/images"
16+
"github.com/containerd/containerd/v2/pkg/snapshotters"
1617
"github.com/containerd/errdefs"
1718
"github.com/containerd/log"
1819
"github.com/containerd/platforms"
@@ -21,6 +22,7 @@ import (
2122
"github.com/siderolabs/go-retry/retry"
2223
"github.com/sirupsen/logrus"
2324

25+
"github.com/siderolabs/talos/internal/pkg/containers/image/progress"
2426
"github.com/siderolabs/talos/pkg/machinery/resources/cri"
2527
)
2628

@@ -37,6 +39,7 @@ type PullOption func(*PullOptions)
3739
type PullOptions struct {
3840
SkipIfAlreadyPulled bool
3941
MaxNotFoundRetries int
42+
NewProgressReporter NewProgressReporter
4043
}
4144

4245
// DefaultPullOptions returns default options for Pull function.
@@ -61,6 +64,23 @@ func WithMaxNotFoundRetries(maxRetries int) PullOption {
6164
}
6265
}
6366

67+
// WithProgressReporter enables reporting pull progress.
68+
func WithProgressReporter(newReporter NewProgressReporter) PullOption {
69+
return func(opts *PullOptions) {
70+
opts.NewProgressReporter = newReporter
71+
}
72+
}
73+
74+
// ProgressReporter is an interface for reporting image pull progress.
75+
type ProgressReporter interface {
76+
Start()
77+
Stop()
78+
Update(progress.LayerPullProgress)
79+
}
80+
81+
// NewProgressReporter creates a new progress reporter.
82+
type NewProgressReporter func(imageRef string) ProgressReporter
83+
6484
// RegistriesBuilder is a function that returns registries configuration.
6585
type RegistriesBuilder = func(context.Context) (cri.Registries, error)
6686

@@ -117,13 +137,48 @@ func Pull(ctx context.Context, registryBuilder RegistriesBuilder, client *contai
117137

118138
resolver := NewResolver(registriesConfig)
119139

120-
if img, err = client.Pull(
121-
ctx,
122-
ref,
140+
containerdRemoteOpts := []containerd.RemoteOpt{
123141
containerd.WithPullUnpack,
124-
containerd.WithResolver(resolver),
125142
containerd.WithChildLabelMap(images.ChildGCLabelsFilterLayers),
126143
containerd.WithPlatformMatcher(platforms.OnlyStrict(platforms.DefaultSpec())),
144+
containerd.WithResolver(resolver),
145+
}
146+
147+
if opts.NewProgressReporter != nil {
148+
reporter := opts.NewProgressReporter(ref)
149+
150+
reporter.Start()
151+
defer reporter.Stop()
152+
153+
pp := progress.NewPullProgress(
154+
client.ContentStore(),
155+
client.SnapshotService("overlayfs"),
156+
reporter.Update,
157+
)
158+
159+
finishProgress := pp.ShowProgress(ctx)
160+
defer finishProgress()
161+
162+
containerdRemoteOpts = append(containerdRemoteOpts,
163+
containerd.WithImageHandler(
164+
images.HandlerFunc(
165+
func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
166+
if images.IsLayerType(desc.MediaType) {
167+
pp.Add(desc)
168+
}
169+
170+
return nil, nil
171+
},
172+
),
173+
),
174+
containerd.WithImageHandlerWrapper(snapshotters.AppendInfoHandlerWrapper(ref)),
175+
)
176+
}
177+
178+
if img, err = client.Pull(
179+
ctx,
180+
ref,
181+
containerdRemoteOpts...,
127182
); err != nil {
128183
err = fmt.Errorf("failed to pull image %q: %w", ref, err)
129184
if errors.Is(err, errdefs.ErrNotFound) {

0 commit comments

Comments
 (0)