Skip to content

Commit 6743ec9

Browse files
authored
Implement remote.Pusher (#1633)
* Implement remote.Pusher The Pusher handles multiplexing across multiple repositories and deduplicating in-flight uploads. This removes the need to do an awkward dance around re-using auth handshakes by passing custom transports. This also fixes limitations in remote.MultiWrite around streaming layers and multiple repositories. * Use remote.Pusher in crane edit This will be faster than doing the handshake twice.
1 parent 0962e29 commit 6743ec9

File tree

7 files changed

+674
-318
lines changed

7 files changed

+674
-318
lines changed

internal/cmd/edit.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package cmd
1717
import (
1818
"archive/tar"
1919
"bytes"
20+
"context"
2021
"encoding/json"
2122
"errors"
2223
"fmt"
@@ -70,7 +71,7 @@ func NewCmdEditConfig(options *[]crane.Option) *cobra.Command {
7071
echo '{}' | crane edit config ubuntu`,
7172
Args: cobra.ExactArgs(1),
7273
RunE: func(cmd *cobra.Command, args []string) error {
73-
ref, err := editConfig(cmd.InOrStdin(), cmd.OutOrStdout(), args[0], dst, *options...)
74+
ref, err := editConfig(cmd.Context(), cmd.InOrStdin(), cmd.OutOrStdout(), args[0], dst, *options...)
7475
if err != nil {
7576
return fmt.Errorf("editing config: %w", err)
7677
}
@@ -154,7 +155,7 @@ func interactiveFile(i any) bool {
154155
return (stat.Mode() & os.ModeCharDevice) != 0
155156
}
156157

157-
func editConfig(in io.Reader, out io.Writer, src, dst string, options ...crane.Option) (name.Reference, error) {
158+
func editConfig(ctx context.Context, in io.Reader, out io.Writer, src, dst string, options ...crane.Option) (name.Reference, error) {
158159
o := crane.GetOptions(options...)
159160

160161
img, err := crane.Pull(src, options...)
@@ -255,11 +256,16 @@ func editConfig(in io.Reader, out io.Writer, src, dst string, options ...crane.O
255256
return nil, err
256257
}
257258

258-
if err := remote.WriteLayer(dstRef.Context(), l, o.Remote...); err != nil {
259+
pusher, err := remote.NewPusher(o.Remote...)
260+
if err != nil {
259261
return nil, err
260262
}
261263

262-
if err := remote.Put(dstRef, rm, o.Remote...); err != nil {
264+
if err := pusher.Upload(ctx, dstRef.Context(), l); err != nil {
265+
return nil, err
266+
}
267+
268+
if err := pusher.Push(ctx, dstRef, rm); err != nil {
263269
return nil, err
264270
}
265271

pkg/v1/remote/descriptor.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ type Descriptor struct {
7373
platform v1.Platform
7474
}
7575

76+
func (d *Descriptor) toDesc() v1.Descriptor {
77+
return d.Descriptor
78+
}
79+
7680
// RawManifest exists to satisfy the Taggable interface.
7781
func (d *Descriptor) RawManifest() ([]byte, error) {
7882
return d.Manifest, nil
@@ -117,7 +121,11 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (*
117121
if err != nil {
118122
return nil, err
119123
}
120-
b, desc, err := f.fetchManifest(o.context, ref, acceptable)
124+
return f.get(o.context, ref, acceptable)
125+
}
126+
127+
func (f *fetcher) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) {
128+
b, desc, err := f.fetchManifest(ctx, ref, acceptable)
121129
if err != nil {
122130
return nil, err
123131
}
@@ -126,7 +134,7 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (*
126134
ref: ref,
127135
Manifest: b,
128136
Descriptor: *desc,
129-
platform: o.platform,
137+
platform: f.platform,
130138
}, nil
131139
}
132140

@@ -237,9 +245,10 @@ type resource interface {
237245

238246
// fetcher implements methods for reading from a registry.
239247
type fetcher struct {
240-
target resource
241-
client *http.Client
242-
context context.Context
248+
target resource
249+
client *http.Client
250+
context context.Context
251+
platform v1.Platform
243252
}
244253

245254
func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, error) {
@@ -266,9 +275,10 @@ func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, er
266275
return nil, err
267276
}
268277
return &fetcher{
269-
target: target,
270-
client: &http.Client{Transport: tr},
271-
context: ctx,
278+
target: target,
279+
client: &http.Client{Transport: tr},
280+
context: ctx,
281+
platform: o.platform,
272282
}, nil
273283
}
274284

pkg/v1/remote/multi_write.go

Lines changed: 12 additions & 257 deletions
Original file line numberDiff line numberDiff line change
@@ -15,277 +15,32 @@
1515
package remote
1616

1717
import (
18-
"context"
19-
"fmt"
20-
2118
"github.com/google/go-containerregistry/pkg/name"
22-
v1 "github.com/google/go-containerregistry/pkg/v1"
23-
"github.com/google/go-containerregistry/pkg/v1/partial"
24-
"github.com/google/go-containerregistry/pkg/v1/types"
2519
"golang.org/x/sync/errgroup"
2620
)
2721

2822
// MultiWrite writes the given Images or ImageIndexes to the given refs, as
29-
// efficiently as possible, by deduping shared layer blobs and uploading layers
30-
// in parallel, then uploading all manifests in parallel.
31-
//
32-
// Current limitations:
33-
// - All refs must share the same repository.
34-
// - Images cannot consist of stream.Layers.
35-
func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
36-
// Determine the repository being pushed to; if asked to push to
37-
// multiple repositories, give up.
38-
var repo, zero name.Repository
39-
for ref := range m {
40-
if repo == zero {
41-
repo = ref.Context()
42-
} else if ref.Context() != repo {
43-
return fmt.Errorf("MultiWrite can only push to the same repository (saw %q and %q)", repo, ref.Context())
44-
}
45-
}
46-
23+
// efficiently as possible, by deduping shared layer blobs while uploading them
24+
// in parallel.
25+
func MultiWrite(todo map[name.Reference]Taggable, options ...Option) (rerr error) {
4726
o, err := makeOptions(options...)
4827
if err != nil {
4928
return err
5029
}
51-
52-
// Collect unique blobs (layers and config blobs).
53-
blobs := map[v1.Hash]v1.Layer{}
54-
newManifests := []map[name.Reference]Taggable{}
55-
// Separate originally requested images and indexes, so we can push images first.
56-
images, indexes := map[name.Reference]Taggable{}, map[name.Reference]Taggable{}
57-
for ref, i := range m {
58-
if img, ok := i.(v1.Image); ok {
59-
images[ref] = i
60-
if err := addImageBlobs(img, blobs, o.allowNondistributableArtifacts); err != nil {
61-
return err
62-
}
63-
continue
64-
}
65-
if idx, ok := i.(v1.ImageIndex); ok {
66-
indexes[ref] = i
67-
newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, 0, o.allowNondistributableArtifacts)
68-
if err != nil {
69-
return err
70-
}
71-
continue
72-
}
73-
return fmt.Errorf("pushable resource was not Image or ImageIndex: %T", i)
74-
}
75-
76-
// Determine if any of the layers are Mountable, because if so we need
77-
// to request Pull scope too.
78-
ls := []v1.Layer{}
79-
for _, l := range blobs {
80-
ls = append(ls, l)
81-
}
82-
w, err := makeWriter(o.context, repo, ls, o)
83-
if err != nil {
84-
return err
30+
if o.progress != nil {
31+
defer func() { o.progress.Close(rerr) }()
8532
}
33+
p := newPusher(o)
8634

87-
// Collect the total size of blobs and manifests we're about to write.
88-
if w.progress != nil {
89-
defer func() { w.progress.Close(rerr) }()
90-
91-
for _, b := range blobs {
92-
size, err := b.Size()
93-
if err != nil {
94-
return err
95-
}
96-
w.progress.total(size)
97-
}
98-
countManifest := func(t Taggable) error {
99-
b, err := t.RawManifest()
100-
if err != nil {
101-
return err
102-
}
103-
w.progress.total(int64(len(b)))
104-
return nil
105-
}
106-
for _, i := range images {
107-
if err := countManifest(i); err != nil {
108-
return err
109-
}
110-
}
111-
for _, nm := range newManifests {
112-
for _, i := range nm {
113-
if err := countManifest(i); err != nil {
114-
return err
115-
}
116-
}
117-
}
118-
for _, i := range indexes {
119-
if err := countManifest(i); err != nil {
120-
return err
121-
}
122-
}
123-
}
35+
g, ctx := errgroup.WithContext(o.context)
36+
g.SetLimit(o.jobs)
12437

125-
// Upload individual blobs and collect any errors.
126-
blobChan := make(chan v1.Layer, 2*o.jobs)
127-
ctx := o.context
128-
g, gctx := errgroup.WithContext(o.context)
129-
for i := 0; i < o.jobs; i++ {
130-
// Start N workers consuming blobs to upload.
38+
for ref, t := range todo {
39+
ref, t := ref, t
13140
g.Go(func() error {
132-
for b := range blobChan {
133-
if err := w.uploadOne(gctx, b); err != nil {
134-
return err
135-
}
136-
}
137-
return nil
41+
return p.Push(ctx, ref, t)
13842
})
13943
}
140-
g.Go(func() error {
141-
defer close(blobChan)
142-
for _, b := range blobs {
143-
select {
144-
case blobChan <- b:
145-
case <-gctx.Done():
146-
return gctx.Err()
147-
}
148-
}
149-
return nil
150-
})
151-
if err := g.Wait(); err != nil {
152-
return err
153-
}
154-
155-
commitMany := func(ctx context.Context, m map[name.Reference]Taggable) error {
156-
g, ctx := errgroup.WithContext(ctx)
157-
// With all of the constituent elements uploaded, upload the manifests
158-
// to commit the images and indexes, and collect any errors.
159-
type task struct {
160-
i Taggable
161-
ref name.Reference
162-
}
163-
taskChan := make(chan task, 2*o.jobs)
164-
for i := 0; i < o.jobs; i++ {
165-
// Start N workers consuming tasks to upload manifests.
166-
g.Go(func() error {
167-
for t := range taskChan {
168-
if err := w.commitManifest(ctx, t.i, t.ref); err != nil {
169-
return err
170-
}
171-
}
172-
return nil
173-
})
174-
}
175-
go func() {
176-
for ref, i := range m {
177-
taskChan <- task{i, ref}
178-
}
179-
close(taskChan)
180-
}()
181-
return g.Wait()
182-
}
183-
// Push originally requested image manifests. These have no
184-
// dependencies.
185-
if err := commitMany(ctx, images); err != nil {
186-
return err
187-
}
188-
// Push new manifests from lowest levels up.
189-
for i := len(newManifests) - 1; i >= 0; i-- {
190-
if err := commitMany(ctx, newManifests[i]); err != nil {
191-
return err
192-
}
193-
}
194-
// Push originally requested index manifests, which might depend on
195-
// newly discovered manifests.
196-
197-
return commitMany(ctx, indexes)
198-
}
19944

200-
// addIndexBlobs adds blobs to the set of blobs we intend to upload, and
201-
// returns the latest copy of the ordered collection of manifests to upload.
202-
func addIndexBlobs(idx v1.ImageIndex, blobs map[v1.Hash]v1.Layer, repo name.Repository, newManifests []map[name.Reference]Taggable, lvl int, allowNondistributableArtifacts bool) ([]map[name.Reference]Taggable, error) {
203-
if lvl > len(newManifests)-1 {
204-
newManifests = append(newManifests, map[name.Reference]Taggable{})
205-
}
206-
207-
im, err := idx.IndexManifest()
208-
if err != nil {
209-
return nil, err
210-
}
211-
for _, desc := range im.Manifests {
212-
switch desc.MediaType {
213-
case types.OCIImageIndex, types.DockerManifestList:
214-
idx, err := idx.ImageIndex(desc.Digest)
215-
if err != nil {
216-
return nil, err
217-
}
218-
newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, lvl+1, allowNondistributableArtifacts)
219-
if err != nil {
220-
return nil, err
221-
}
222-
223-
// Also track the sub-index manifest to upload later by digest.
224-
newManifests[lvl][repo.Digest(desc.Digest.String())] = idx
225-
case types.OCIManifestSchema1, types.DockerManifestSchema2:
226-
img, err := idx.Image(desc.Digest)
227-
if err != nil {
228-
return nil, err
229-
}
230-
if err := addImageBlobs(img, blobs, allowNondistributableArtifacts); err != nil {
231-
return nil, err
232-
}
233-
234-
// Also track the sub-image manifest to upload later by digest.
235-
newManifests[lvl][repo.Digest(desc.Digest.String())] = img
236-
default:
237-
// Workaround for #819.
238-
if wl, ok := idx.(withLayer); ok {
239-
layer, err := wl.Layer(desc.Digest)
240-
if err != nil {
241-
return nil, err
242-
}
243-
if err := addLayerBlob(layer, blobs, allowNondistributableArtifacts); err != nil {
244-
return nil, err
245-
}
246-
} else {
247-
return nil, fmt.Errorf("unknown media type: %v", desc.MediaType)
248-
}
249-
}
250-
}
251-
return newManifests, nil
252-
}
253-
254-
func addLayerBlob(l v1.Layer, blobs map[v1.Hash]v1.Layer, allowNondistributableArtifacts bool) error {
255-
// Ignore foreign layers.
256-
mt, err := l.MediaType()
257-
if err != nil {
258-
return err
259-
}
260-
261-
if mt.IsDistributable() || allowNondistributableArtifacts {
262-
d, err := l.Digest()
263-
if err != nil {
264-
return err
265-
}
266-
267-
blobs[d] = l
268-
}
269-
270-
return nil
271-
}
272-
273-
func addImageBlobs(img v1.Image, blobs map[v1.Hash]v1.Layer, allowNondistributableArtifacts bool) error {
274-
ls, err := img.Layers()
275-
if err != nil {
276-
return err
277-
}
278-
// Collect all layers.
279-
for _, l := range ls {
280-
if err := addLayerBlob(l, blobs, allowNondistributableArtifacts); err != nil {
281-
return err
282-
}
283-
}
284-
285-
// Collect config blob.
286-
cl, err := partial.ConfigLayer(img)
287-
if err != nil {
288-
return err
289-
}
290-
return addLayerBlob(cl, blobs, allowNondistributableArtifacts)
45+
return g.Wait()
29146
}

0 commit comments

Comments
 (0)