Skip to content

Commit 3e8475d

Browse files
committed
WIP is working
1 parent 132d485 commit 3e8475d

4 files changed

Lines changed: 93 additions & 14 deletions

File tree

cmd/src/batch_common.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,9 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error {
295295
}
296296
batchCompletePending(pending, fmt.Sprintf("Found %d workspaces with steps to execute", len(tasks)))
297297

298-
execOpts := executor.Opts{
298+
// EXECUTION OF TASKS
299+
300+
svc.InitExecutor(ctx, executor.Opts{
299301
CacheDir: opts.flags.cacheDir,
300302
ClearCache: opts.flags.clearCache,
301303
CleanArchives: opts.flags.cleanArchives,
@@ -304,10 +306,21 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error {
304306
Timeout: opts.flags.timeout,
305307
KeepLogs: opts.flags.keepLogs,
306308
TempDir: opts.flags.tempDir,
309+
})
310+
311+
pending = batchCreatePending(opts.out, "Checking cache for changeset specs")
312+
uncachedTasks, cachedSpecs, err := svc.CheckCache(ctx, tasks)
313+
if err != nil {
314+
return err
315+
}
316+
if len(uncachedTasks) > 0 {
317+
batchCompletePending(pending, fmt.Sprintf("Found %d cached changeset specs. %d tasks need to be executed", len(cachedSpecs), len(uncachedTasks)))
318+
} else {
319+
batchCompletePending(pending, fmt.Sprintf("Found %d cached changeset specs. No tasks need to be executed", len(cachedSpecs)))
307320
}
308321

309322
p := newBatchProgressPrinter(opts.out, *verbose, opts.flags.parallelism)
310-
specs, logFiles, err := svc.RunExecutor(ctx, execOpts, tasks, batchSpec, p.PrintStatuses, opts.flags.skipErrors)
323+
freshSpecs, logFiles, err := svc.RunExecutor(ctx, uncachedTasks, batchSpec, p.PrintStatuses, opts.flags.skipErrors)
311324
if err != nil && !opts.flags.skipErrors {
312325
return err
313326
}
@@ -328,6 +341,8 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error {
328341
}()
329342
}
330343

344+
specs := append(cachedSpecs, freshSpecs...)
345+
331346
err = svc.ValidateChangesetSpecs(repos, specs)
332347
if err != nil {
333348
return err

internal/batches/executor/executor.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type Executor interface {
5050
Start(ctx context.Context)
5151
Wait(ctx context.Context) ([]*batches.ChangesetSpec, error)
5252

53+
CheckCache(ctx context.Context, task *Task) (specs []*batches.ChangesetSpec, found bool, err error)
54+
5355
// LockedTaskStatuses calls the given function with the current state of
5456
// the task statuses. Before calling the function, the statuses are locked
5557
// to provide a consistent view of all statuses, but that also means the
@@ -289,6 +291,43 @@ func (x *executor) Wait(ctx context.Context) ([]*batches.ChangesetSpec, error) {
289291
return x.specs, nil
290292
}
291293

294+
func (x *executor) CheckCache(ctx context.Context, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) {
295+
// Check if the task is cached.
296+
cacheKey := task.cacheKey()
297+
if x.clearCache {
298+
if err = x.cache.Clear(ctx, cacheKey); err != nil {
299+
return specs, false, errors.Wrapf(err, "clearing cache for %q", task.Repository.Name)
300+
}
301+
302+
return specs, false, nil
303+
}
304+
305+
var result executionResult
306+
result, found, err = x.cache.Get(ctx, cacheKey)
307+
if err != nil {
308+
return specs, false, errors.Wrapf(err, "checking cache for %q", task.Repository.Name)
309+
}
310+
311+
if !found {
312+
return specs, false, nil
313+
}
314+
315+
// If the cached result resulted in an empty diff, we don't need to
316+
// add it to the list of specs that are displayed to the user and
317+
// send to the server. Instead, we can just report that the task is
318+
// complete and move on.
319+
if result.Diff == "" {
320+
return specs, true, nil
321+
}
322+
323+
specs, err = createChangesetSpecs(task, result, x.features)
324+
if err != nil {
325+
return specs, false, err
326+
}
327+
328+
return specs, true, nil
329+
}
330+
292331
func (x *executor) do(ctx context.Context, task *Task) (err error) {
293332
// Ensure that the status is updated when we're done.
294333
defer func() {
@@ -349,7 +388,7 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) {
349388
})
350389

351390
// Add the spec to the executor's list of completed specs.
352-
if err := x.addCompletedSpecs(task.Repository, specs); err != nil {
391+
if err := x.addCompletedSpecs(specs); err != nil {
353392
return err
354393
}
355394

@@ -429,7 +468,7 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) {
429468
status.ChangesetSpecs = specs
430469
})
431470

432-
if err := x.addCompletedSpecs(task.Repository, specs); err != nil {
471+
if err := x.addCompletedSpecs(specs); err != nil {
433472
return err
434473
}
435474

@@ -446,7 +485,7 @@ func (x *executor) updateTaskStatus(task *Task, update func(status *TaskStatus))
446485
}
447486
}
448487

449-
func (x *executor) addCompletedSpecs(repository *graphql.Repository, specs []*batches.ChangesetSpec) error {
488+
func (x *executor) addCompletedSpecs(specs []*batches.ChangesetSpec) error {
450489
x.specsMu.Lock()
451490
defer x.specsMu.Unlock()
452491

internal/batches/repo_fetcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ func (rz *repoZip) Close() error {
184184
defer rz.mu.Unlock()
185185

186186
rz.uses -= 1
187+
187188
if rz.uses == 0 && rz.checkouts == 0 && rz.deleteOnClose {
188189
for _, addFile := range rz.additionalFiles {
189190
if addFile.fetched {

internal/batches/service/service.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ type Service struct {
2828
client api.Client
2929
features batches.FeatureFlags
3030
imageCache *docker.ImageCache
31+
32+
// TODO(mrnugget): I don't like this state here, ugh.
33+
exec executor.Executor
3134
}
3235

3336
type Opts struct {
@@ -192,24 +195,45 @@ func (svc *Service) BuildTasks(ctx context.Context, repos []*graphql.Repository,
192195
return builder.BuildAll(ctx, repos)
193196
}
194197

195-
func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks []*executor.Task, spec *batches.BatchSpec, progress func([]*executor.TaskStatus), skipErrors bool) ([]*batches.ChangesetSpec, []string, error) {
196-
x := executor.New(opts, svc.client, svc.features)
198+
func (svc *Service) InitExecutor(ctx context.Context, opts executor.Opts) {
199+
svc.exec = executor.New(opts, svc.client, svc.features)
200+
}
201+
202+
func (svc *Service) CheckCache(ctx context.Context, tasks []*executor.Task) (uncached []*executor.Task, specs []*batches.ChangesetSpec, err error) {
203+
for _, t := range tasks {
204+
cachedSpecs, found, err := svc.exec.CheckCache(ctx, t)
205+
if err != nil {
206+
return nil, nil, err
207+
}
208+
209+
if !found {
210+
uncached = append(uncached, t)
211+
continue
212+
}
213+
214+
specs = append(specs, cachedSpecs...)
215+
}
216+
217+
return uncached, specs, nil
218+
}
219+
220+
func (svc *Service) RunExecutor(ctx context.Context, tasks []*executor.Task, spec *batches.BatchSpec, progress func([]*executor.TaskStatus), skipErrors bool) ([]*batches.ChangesetSpec, []string, error) {
197221
for _, t := range tasks {
198-
x.AddTask(t)
222+
svc.exec.AddTask(t)
199223
}
200224

201225
done := make(chan struct{})
202226
if progress != nil {
203227
go func() {
204-
x.LockedTaskStatuses(progress)
228+
svc.exec.LockedTaskStatuses(progress)
205229

206230
ticker := time.NewTicker(1 * time.Second)
207231
defer ticker.Stop()
208232

209233
for {
210234
select {
211235
case <-ticker.C:
212-
x.LockedTaskStatuses(progress)
236+
svc.exec.LockedTaskStatuses(progress)
213237

214238
case <-done:
215239
return
@@ -220,10 +244,10 @@ func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks [
220244

221245
var errs *multierror.Error
222246

223-
x.Start(ctx)
224-
specs, err := x.Wait(ctx)
247+
svc.exec.Start(ctx)
248+
specs, err := svc.exec.Wait(ctx)
225249
if progress != nil {
226-
x.LockedTaskStatuses(progress)
250+
svc.exec.LockedTaskStatuses(progress)
227251
done <- struct{}{}
228252
}
229253
if err != nil {
@@ -272,7 +296,7 @@ func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks [
272296
}
273297
}
274298

275-
return specs, x.LogFiles(), errs.ErrorOrNil()
299+
return specs, svc.exec.LogFiles(), errs.ErrorOrNil()
276300
}
277301

278302
func (svc *Service) ValidateChangesetSpecs(repos []*graphql.Repository, specs []*batches.ChangesetSpec) error {

0 commit comments

Comments
 (0)