|
15 | 15 | package remote |
16 | 16 |
|
17 | 17 | import ( |
18 | | - "context" |
19 | | - "fmt" |
20 | | - |
21 | 18 | "github.com/google/go-containerregistry/pkg/name" |
22 | | - v1 "github.com/google/go-containerregistry/pkg/v1" |
23 | | - "github.com/google/go-containerregistry/pkg/v1/partial" |
24 | | - "github.com/google/go-containerregistry/pkg/v1/types" |
25 | 19 | "golang.org/x/sync/errgroup" |
26 | 20 | ) |
27 | 21 |
|
28 | 22 | // MultiWrite writes the given Images or ImageIndexes to the given refs, as |
29 | | -// efficiently as possible, by deduping shared layer blobs and uploading layers |
30 | | -// in parallel, then uploading all manifests in parallel. |
31 | | -// |
32 | | -// Current limitations: |
33 | | -// - All refs must share the same repository. |
34 | | -// - Images cannot consist of stream.Layers. |
35 | | -func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) { |
36 | | - // Determine the repository being pushed to; if asked to push to |
37 | | - // multiple repositories, give up. |
38 | | - var repo, zero name.Repository |
39 | | - for ref := range m { |
40 | | - if repo == zero { |
41 | | - repo = ref.Context() |
42 | | - } else if ref.Context() != repo { |
43 | | - return fmt.Errorf("MultiWrite can only push to the same repository (saw %q and %q)", repo, ref.Context()) |
44 | | - } |
45 | | - } |
46 | | - |
| 23 | +// efficiently as possible, by deduping shared layer blobs while uploading them |
| 24 | +// in parallel. |
| 25 | +func MultiWrite(todo map[name.Reference]Taggable, options ...Option) (rerr error) { |
47 | 26 | o, err := makeOptions(options...) |
48 | 27 | if err != nil { |
49 | 28 | return err |
50 | 29 | } |
51 | | - |
52 | | - // Collect unique blobs (layers and config blobs). |
53 | | - blobs := map[v1.Hash]v1.Layer{} |
54 | | - newManifests := []map[name.Reference]Taggable{} |
55 | | - // Separate originally requested images and indexes, so we can push images first. |
56 | | - images, indexes := map[name.Reference]Taggable{}, map[name.Reference]Taggable{} |
57 | | - for ref, i := range m { |
58 | | - if img, ok := i.(v1.Image); ok { |
59 | | - images[ref] = i |
60 | | - if err := addImageBlobs(img, blobs, o.allowNondistributableArtifacts); err != nil { |
61 | | - return err |
62 | | - } |
63 | | - continue |
64 | | - } |
65 | | - if idx, ok := i.(v1.ImageIndex); ok { |
66 | | - indexes[ref] = i |
67 | | - newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, 0, o.allowNondistributableArtifacts) |
68 | | - if err != nil { |
69 | | - return err |
70 | | - } |
71 | | - continue |
72 | | - } |
73 | | - return fmt.Errorf("pushable resource was not Image or ImageIndex: %T", i) |
74 | | - } |
75 | | - |
76 | | - // Determine if any of the layers are Mountable, because if so we need |
77 | | - // to request Pull scope too. |
78 | | - ls := []v1.Layer{} |
79 | | - for _, l := range blobs { |
80 | | - ls = append(ls, l) |
81 | | - } |
82 | | - w, err := makeWriter(o.context, repo, ls, o) |
83 | | - if err != nil { |
84 | | - return err |
| 30 | + if o.progress != nil { |
| 31 | + defer func() { o.progress.Close(rerr) }() |
85 | 32 | } |
| 33 | + p := newPusher(o) |
86 | 34 |
|
87 | | - // Collect the total size of blobs and manifests we're about to write. |
88 | | - if w.progress != nil { |
89 | | - defer func() { w.progress.Close(rerr) }() |
90 | | - |
91 | | - for _, b := range blobs { |
92 | | - size, err := b.Size() |
93 | | - if err != nil { |
94 | | - return err |
95 | | - } |
96 | | - w.progress.total(size) |
97 | | - } |
98 | | - countManifest := func(t Taggable) error { |
99 | | - b, err := t.RawManifest() |
100 | | - if err != nil { |
101 | | - return err |
102 | | - } |
103 | | - w.progress.total(int64(len(b))) |
104 | | - return nil |
105 | | - } |
106 | | - for _, i := range images { |
107 | | - if err := countManifest(i); err != nil { |
108 | | - return err |
109 | | - } |
110 | | - } |
111 | | - for _, nm := range newManifests { |
112 | | - for _, i := range nm { |
113 | | - if err := countManifest(i); err != nil { |
114 | | - return err |
115 | | - } |
116 | | - } |
117 | | - } |
118 | | - for _, i := range indexes { |
119 | | - if err := countManifest(i); err != nil { |
120 | | - return err |
121 | | - } |
122 | | - } |
123 | | - } |
| 35 | + g, ctx := errgroup.WithContext(o.context) |
| 36 | + g.SetLimit(o.jobs) |
124 | 37 |
|
125 | | - // Upload individual blobs and collect any errors. |
126 | | - blobChan := make(chan v1.Layer, 2*o.jobs) |
127 | | - ctx := o.context |
128 | | - g, gctx := errgroup.WithContext(o.context) |
129 | | - for i := 0; i < o.jobs; i++ { |
130 | | - // Start N workers consuming blobs to upload. |
| 38 | + for ref, t := range todo { |
| 39 | + ref, t := ref, t |
131 | 40 | g.Go(func() error { |
132 | | - for b := range blobChan { |
133 | | - if err := w.uploadOne(gctx, b); err != nil { |
134 | | - return err |
135 | | - } |
136 | | - } |
137 | | - return nil |
| 41 | + return p.Push(ctx, ref, t) |
138 | 42 | }) |
139 | 43 | } |
140 | | - g.Go(func() error { |
141 | | - defer close(blobChan) |
142 | | - for _, b := range blobs { |
143 | | - select { |
144 | | - case blobChan <- b: |
145 | | - case <-gctx.Done(): |
146 | | - return gctx.Err() |
147 | | - } |
148 | | - } |
149 | | - return nil |
150 | | - }) |
151 | | - if err := g.Wait(); err != nil { |
152 | | - return err |
153 | | - } |
154 | | - |
155 | | - commitMany := func(ctx context.Context, m map[name.Reference]Taggable) error { |
156 | | - g, ctx := errgroup.WithContext(ctx) |
157 | | - // With all of the constituent elements uploaded, upload the manifests |
158 | | - // to commit the images and indexes, and collect any errors. |
159 | | - type task struct { |
160 | | - i Taggable |
161 | | - ref name.Reference |
162 | | - } |
163 | | - taskChan := make(chan task, 2*o.jobs) |
164 | | - for i := 0; i < o.jobs; i++ { |
165 | | - // Start N workers consuming tasks to upload manifests. |
166 | | - g.Go(func() error { |
167 | | - for t := range taskChan { |
168 | | - if err := w.commitManifest(ctx, t.i, t.ref); err != nil { |
169 | | - return err |
170 | | - } |
171 | | - } |
172 | | - return nil |
173 | | - }) |
174 | | - } |
175 | | - go func() { |
176 | | - for ref, i := range m { |
177 | | - taskChan <- task{i, ref} |
178 | | - } |
179 | | - close(taskChan) |
180 | | - }() |
181 | | - return g.Wait() |
182 | | - } |
183 | | - // Push originally requested image manifests. These have no |
184 | | - // dependencies. |
185 | | - if err := commitMany(ctx, images); err != nil { |
186 | | - return err |
187 | | - } |
188 | | - // Push new manifests from lowest levels up. |
189 | | - for i := len(newManifests) - 1; i >= 0; i-- { |
190 | | - if err := commitMany(ctx, newManifests[i]); err != nil { |
191 | | - return err |
192 | | - } |
193 | | - } |
194 | | - // Push originally requested index manifests, which might depend on |
195 | | - // newly discovered manifests. |
196 | | - |
197 | | - return commitMany(ctx, indexes) |
198 | | -} |
199 | 44 |
|
200 | | -// addIndexBlobs adds blobs to the set of blobs we intend to upload, and |
201 | | -// returns the latest copy of the ordered collection of manifests to upload. |
202 | | -func addIndexBlobs(idx v1.ImageIndex, blobs map[v1.Hash]v1.Layer, repo name.Repository, newManifests []map[name.Reference]Taggable, lvl int, allowNondistributableArtifacts bool) ([]map[name.Reference]Taggable, error) { |
203 | | - if lvl > len(newManifests)-1 { |
204 | | - newManifests = append(newManifests, map[name.Reference]Taggable{}) |
205 | | - } |
206 | | - |
207 | | - im, err := idx.IndexManifest() |
208 | | - if err != nil { |
209 | | - return nil, err |
210 | | - } |
211 | | - for _, desc := range im.Manifests { |
212 | | - switch desc.MediaType { |
213 | | - case types.OCIImageIndex, types.DockerManifestList: |
214 | | - idx, err := idx.ImageIndex(desc.Digest) |
215 | | - if err != nil { |
216 | | - return nil, err |
217 | | - } |
218 | | - newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, lvl+1, allowNondistributableArtifacts) |
219 | | - if err != nil { |
220 | | - return nil, err |
221 | | - } |
222 | | - |
223 | | - // Also track the sub-index manifest to upload later by digest. |
224 | | - newManifests[lvl][repo.Digest(desc.Digest.String())] = idx |
225 | | - case types.OCIManifestSchema1, types.DockerManifestSchema2: |
226 | | - img, err := idx.Image(desc.Digest) |
227 | | - if err != nil { |
228 | | - return nil, err |
229 | | - } |
230 | | - if err := addImageBlobs(img, blobs, allowNondistributableArtifacts); err != nil { |
231 | | - return nil, err |
232 | | - } |
233 | | - |
234 | | - // Also track the sub-image manifest to upload later by digest. |
235 | | - newManifests[lvl][repo.Digest(desc.Digest.String())] = img |
236 | | - default: |
237 | | - // Workaround for #819. |
238 | | - if wl, ok := idx.(withLayer); ok { |
239 | | - layer, err := wl.Layer(desc.Digest) |
240 | | - if err != nil { |
241 | | - return nil, err |
242 | | - } |
243 | | - if err := addLayerBlob(layer, blobs, allowNondistributableArtifacts); err != nil { |
244 | | - return nil, err |
245 | | - } |
246 | | - } else { |
247 | | - return nil, fmt.Errorf("unknown media type: %v", desc.MediaType) |
248 | | - } |
249 | | - } |
250 | | - } |
251 | | - return newManifests, nil |
252 | | -} |
253 | | - |
254 | | -func addLayerBlob(l v1.Layer, blobs map[v1.Hash]v1.Layer, allowNondistributableArtifacts bool) error { |
255 | | - // Ignore foreign layers. |
256 | | - mt, err := l.MediaType() |
257 | | - if err != nil { |
258 | | - return err |
259 | | - } |
260 | | - |
261 | | - if mt.IsDistributable() || allowNondistributableArtifacts { |
262 | | - d, err := l.Digest() |
263 | | - if err != nil { |
264 | | - return err |
265 | | - } |
266 | | - |
267 | | - blobs[d] = l |
268 | | - } |
269 | | - |
270 | | - return nil |
271 | | -} |
272 | | - |
273 | | -func addImageBlobs(img v1.Image, blobs map[v1.Hash]v1.Layer, allowNondistributableArtifacts bool) error { |
274 | | - ls, err := img.Layers() |
275 | | - if err != nil { |
276 | | - return err |
277 | | - } |
278 | | - // Collect all layers. |
279 | | - for _, l := range ls { |
280 | | - if err := addLayerBlob(l, blobs, allowNondistributableArtifacts); err != nil { |
281 | | - return err |
282 | | - } |
283 | | - } |
284 | | - |
285 | | - // Collect config blob. |
286 | | - cl, err := partial.ConfigLayer(img) |
287 | | - if err != nil { |
288 | | - return err |
289 | | - } |
290 | | - return addLayerBlob(cl, blobs, allowNondistributableArtifacts) |
| 45 | + return g.Wait() |
291 | 46 | } |
0 commit comments