Skip to content

kahoon/pending

pending

Go Reference Go Report Card codecov License: MIT

pending is a minimalist, context-aware deferred task scheduler for Go.

pending is designed for in-memory, ID-based deferred actions. It fits use cases like debouncing user input, handling hardware delays, or managing state-dependent timeouts.

pending overview

Why pending?

  • Pure Go: Built entirely on the standard library.
  • Simple API: Core lifecycle methods plus optional introspection helpers.
  • Debouncing by ID: Scheduling the same ID replaces the previous task.
  • Concurrency Limits: Choose blocking or dropping behavior when at capacity.
  • Graceful Shutdown: Cancel timers and wait for active tasks to finish.
  • Runtime Stats: Read pending/running/status state via Stats().
  • Task Introspection: Check IsPending() and TimeRemaining() per task ID.
  • Pluggable Telemetry: Attach your own metrics/logging hooks.

Installation

go get github.com/kahoon/pending

Quick Start

mgr := pending.NewManager(
    pending.WithLimit(5, pending.StrategyDrop),
)

defer mgr.Shutdown(context.Background())

mgr.Schedule("user:42:email", 2*time.Second, func(ctx context.Context) {
    // send email reminder
})

// reschedule with same ID (debounce)
mgr.Schedule("user:42:email", 2*time.Second, func(ctx context.Context) {
    // send latest reminder payload
})

Scheduling APIs

Simple path: Schedule

Use Schedule for straightforward delayed execution with ID-based debouncing:

mgr.Schedule("invoice:42:reminder", 10*time.Minute, func(ctx context.Context) {
    if ctx.Err() != nil {
        return
    }
    sendReminder("42")
})

Advanced path: ScheduleWith

Use ScheduleWith when you need option-driven behavior such as absolute-time scheduling and IfAbsent semantics:

at := time.Now().Add(30 * time.Second)

scheduled, err := mgr.ScheduleWith(
    "report:nightly",
    func(ctx context.Context) error {
        return generateReport(ctx)
    },
    pending.ScheduleOptions{
        At:       at,
        IfAbsent: true,
    },
)
if err != nil {
    if errors.Is(err, pending.ErrInvalidScheduleOptions) {
        log.Printf("invalid scheduling options: %v", err)
    }
    if errors.Is(err, pending.ErrManagerNotAccepting) {
        log.Printf("manager is shutting down, task rejected")
    }
}
if !scheduled {
    log.Printf("task already exists and IfAbsent prevented replacement")
}

Cookbook

Debouncing User Events

mgr := pending.NewManager()

func onSearchInput(userID, query string) {
    key := "search:" + userID
    mgr.Schedule(key, 300*time.Millisecond, func(ctx context.Context) {
        if ctx.Err() != nil {
            return
        }
        runSearch(query)
    })
}

Resettable State Timeout

mgr := pending.NewManager()

func onSessionActivity(sessionID string) {
    key := "session-timeout:" + sessionID
    mgr.Schedule(key, 15*time.Minute, func(ctx context.Context) {
        if ctx.Err() != nil {
            return
        }
        expireSession(sessionID)
    })
}

Concurrency Limits (Drop)

mgr := pending.NewManager(
    pending.WithLimit(5, pending.StrategyDrop),
)

for i := 0; i < 100; i++ {
    id := fmt.Sprintf("task_%d", i)
    mgr.Schedule(id, 1*time.Second, heavyDatabaseQuery)
}

When a task is dropped under StrategyDrop, your telemetry handler receives pending.ErrTaskDropped via OnFailed, so you can match it with errors.Is.

Delayed Retry with Cancellation

mgr := pending.NewManager()

func scheduleRetry(jobID string, attempt int) {
    key := "retry:" + jobID
    delay := time.Duration(attempt) * time.Second

    mgr.Schedule(key, delay, func(ctx context.Context) {
        if ctx.Err() != nil {
            return
        }
        if err := sendWebhook(jobID); err != nil {
            scheduleRetry(jobID, attempt+1)
        }
    })
}

// Stop any pending retry if the job succeeds elsewhere.
func onJobSucceeded(jobID string) {
    mgr.Cancel("retry:" + jobID)
}

Safe Service Shutdown Wiring in main()

func main() {
    mgr := pending.NewManager()
    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        _ = mgr.Shutdown(ctx)
    }()

    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    defer signal.Stop(sig)

    go runHTTPServer(mgr)

    <-sig
    log.Println("signal received, draining pending tasks")
}

Manual Cancellation

mgr := pending.NewManager()

mgr.Schedule("user_123_unlock", 30*time.Minute, unlockTask)

// User was unlocked manually, no need to run delayed task.
mgr.Cancel("user_123_unlock")

Runtime Stats

s := mgr.Stats()
log.Printf("pending=%d running=%d status=%s", s.Pending, s.Running, s.Status)

Task Introspection

if mgr.IsPending("retry:order-42") {
    remaining := mgr.TimeRemaining("retry:order-42")
    log.Printf("retry is still pending, timer fires in %s", remaining)
}

TimeRemaining reports time until the task's timer fires. It returns 0 if the task is missing or already started.

Benchmarks

Run benchmarks:

go test -run ^$ -bench BenchmarkManager_ -benchmem ./...

Sample output (darwin/arm64, Apple M4):

BenchmarkManager_Schedule-10                     	  969188	       293.8 ns/op	     473 B/op	       6 allocs/op
BenchmarkManager_RescheduleSameID-10             	 1502884	       158.6 ns/op	     304 B/op	       5 allocs/op
BenchmarkManager_Cancel-10                       	 1280494	       188.9 ns/op	     311 B/op	       5 allocs/op
BenchmarkManager_Shutdown_NoRunningTasks-10      	   75316	      3226 ns/op	      16 B/op	       1 allocs/op
BenchmarkManager_Shutdown_WithRunningTasks-10    	   44101	      5451 ns/op	      16 B/op	       1 allocs/op

Results will vary by hardware, OS, and Go version.

Scope

pending is not a cron replacement. It is intentionally focused on in-process deferred work with ID-based replacement and cancellation.

Community

License

MIT. See LICENSE.

About

Minimal in-memory deferred task scheduler for Go with ID-based debouncing, cancellation, concurrency limits, and graceful shutdown.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Languages