pending

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: MIT Imports: 5 Imported by: 0

README

pending

Go Reference Go Report Card codecov

pending is a minimalist, context-aware deferred task scheduler for Go.

pending is designed for in-memory, ID-based deferred actions. It fits use cases like debouncing user input, handling hardware delays, or managing state-dependent timeouts.

pending overview

Why pending?

  • Pure Go: Built entirely on the standard library.
  • Simple API: Core lifecycle methods plus optional introspection helpers.
  • Debouncing by ID: Scheduling the same ID replaces the previous task.
  • Concurrency Limits: Choose blocking or dropping behavior when at capacity.
  • Graceful Shutdown: Cancel timers and wait for active tasks to finish.
  • Runtime Stats: Read pending/running/status state via Stats().
  • Task Introspection: Check IsPending() and TimeRemaining() per task ID.
  • Pluggable Telemetry: Attach your own metrics/logging hooks.

Installation

go get github.com/kahoon/pending

Quick Start

mgr := pending.NewManager(
    pending.WithLimit(5, pending.StrategyDrop),
)

defer mgr.Shutdown(context.Background())

mgr.Schedule("user:42:email", 2*time.Second, func(ctx context.Context) {
    // send email reminder
})

// reschedule with same ID (debounce)
mgr.Schedule("user:42:email", 2*time.Second, func(ctx context.Context) {
    // send latest reminder payload
})

Cookbook

Debouncing User Events
mgr := pending.NewManager()

func onSearchInput(userID, query string) {
    key := "search:" + userID
    mgr.Schedule(key, 300*time.Millisecond, func(ctx context.Context) {
        if ctx.Err() != nil {
            return
        }
        runSearch(query)
    })
}
Resettable State Timeout
mgr := pending.NewManager()

func onSessionActivity(sessionID string) {
    key := "session-timeout:" + sessionID
    mgr.Schedule(key, 15*time.Minute, func(ctx context.Context) {
        if ctx.Err() != nil {
            return
        }
        expireSession(sessionID)
    })
}
Concurrency Limits (Drop)
mgr := pending.NewManager(
    pending.WithLimit(5, pending.StrategyDrop),
)

for i := 0; i < 100; i++ {
    id := fmt.Sprintf("task_%d", i)
    mgr.Schedule(id, 1*time.Second, heavyDatabaseQuery)
}

When a task is dropped under StrategyDrop, your telemetry handler receives pending.ErrTaskDropped via OnFailed, so you can match it with errors.Is.

Delayed Retry with Cancellation
mgr := pending.NewManager()

func scheduleRetry(jobID string, attempt int) {
    key := "retry:" + jobID
    delay := time.Duration(attempt) * time.Second

    mgr.Schedule(key, delay, func(ctx context.Context) {
        if ctx.Err() != nil {
            return
        }
        if err := sendWebhook(jobID); err != nil {
            scheduleRetry(jobID, attempt+1)
        }
    })
}

// Stop any pending retry if the job succeeds elsewhere.
func onJobSucceeded(jobID string) {
    mgr.Cancel("retry:" + jobID)
}
Safe Service Shutdown Wiring in main()
func main() {
    mgr := pending.NewManager()
    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        _ = mgr.Shutdown(ctx)
    }()

    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    defer signal.Stop(sig)

    go runHTTPServer(mgr)

    <-sig
    log.Println("signal received, draining pending tasks")
}
Manual Cancellation
mgr := pending.NewManager()

mgr.Schedule("user_123_unlock", 30*time.Minute, unlockTask)

// User was unlocked manually, no need to run delayed task.
mgr.Cancel("user_123_unlock")
Runtime Stats
s := mgr.Stats()
log.Printf("pending=%d running=%d status=%s", s.Pending, s.Running, s.Status)
Task Introspection
if mgr.IsPending("retry:order-42") {
    remaining := mgr.TimeRemaining("retry:order-42")
    log.Printf("retry is still pending, timer fires in %s", remaining)
}

TimeRemaining reports time until the task's timer fires. It returns 0 if the task is missing or already started.

Benchmarks

Run benchmarks:

go test -run ^$ -bench BenchmarkManager_ -benchmem ./...

Sample output (darwin/arm64, Apple M4):

BenchmarkManager_Schedule-10                     	  969188	       293.8 ns/op	     473 B/op	       6 allocs/op
BenchmarkManager_RescheduleSameID-10             	 1502884	       158.6 ns/op	     304 B/op	       5 allocs/op
BenchmarkManager_Cancel-10                       	 1280494	       188.9 ns/op	     311 B/op	       5 allocs/op
BenchmarkManager_Shutdown_NoRunningTasks-10      	   75316	      3226 ns/op	      16 B/op	       1 allocs/op
BenchmarkManager_Shutdown_WithRunningTasks-10    	   44101	      5451 ns/op	      16 B/op	       1 allocs/op

Results will vary by hardware, OS, and Go version.

Scope

pending is not a cron replacement. It is intentionally focused on in-process deferred work with ID-based replacement and cancellation.

Community

License

MIT. See LICENSE.

Documentation

Overview

Package pending provides a small in-memory scheduler for deferred tasks.

Tasks are keyed by ID, so scheduling with the same ID replaces the previous task (debouncing). The manager supports cancellation, graceful shutdown, and optional concurrency limits with block or drop strategies.

Runtime state can be inspected with Stats(), while IsPending() and TimeRemaining() provide per-task introspection.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrTaskDropped = errors.New("pending: task dropped due to concurrency limit")

ErrTaskDropped is reported to TelemetryHandler.OnFailed when StrategyDrop rejects a task because no concurrency slot is available.

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates the lifecycle of delayed tasks, ensuring thread-safety and providing concurrency control via semaphores.

func NewManager

func NewManager(opts ...Option) *Manager

NewManager initializes a new Manager with the provided options.

Example
package main

import (
	"context"
	"errors"
	"time"
)

type exampleTelemetry struct{}

func (exampleTelemetry) OnScheduled(id string, d time.Duration)       {}
func (exampleTelemetry) OnRescheduled(id string)                      {}
func (exampleTelemetry) OnExecuted(id string, duration time.Duration) {}
func (exampleTelemetry) OnCancelled(id string)                        {}
func (exampleTelemetry) OnFailed(id string, err error) {
	_ = errors.Is(err, ErrTaskDropped)
}

func main() {
	mgr := NewManager(
		WithLimit(1, StrategyDrop),
		WithLogger(exampleTelemetry{}),
	)
	defer mgr.Shutdown(context.Background())

	mgr.Schedule("email:user-42", 2*time.Second, func(ctx context.Context) {})
}

func (*Manager) Cancel

func (m *Manager) Cancel(id string)

Cancel immediately stops a pending task by its ID and prevents it from running. If the task is already running, its context is cancelled.

func (*Manager) IsPending added in v0.3.0

func (m *Manager) IsPending(id string) bool

IsPending reports whether a task exists and has not started execution yet.

func (*Manager) Schedule

func (m *Manager) Schedule(id string, d time.Duration, task Task)

Schedule plans a task for execution after duration d. If a task with the same id already exists, the previous one is cancelled and replaced (debouncing). If the manager is not accepting new tasks, Schedule does nothing.

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

Shutdown stops the manager, cancels all pending timers, and waits for currently executing tasks to complete or for the context to time out.

func (*Manager) Stats added in v0.1.1

func (m *Manager) Stats() Stats

Stats returns a lock-safe snapshot of manager state.

func (*Manager) TimeRemaining added in v0.3.0

func (m *Manager) TimeRemaining(id string) time.Duration

TimeRemaining returns the remaining time until a pending task's timer fires. If the task does not exist or is no longer pending, it returns zero.

type Option

type Option func(*Manager)

Option configures a Manager.

func WithLimit

func WithLimit(limit int, strategy Strategy) Option

WithLimit sets the maximum number of concurrent tasks.

func WithLogger

func WithLogger(logger TelemetryHandler) Option

WithLogger attaches a custom TelemetryHandler.

type Stats added in v0.1.1

type Stats struct {
	// Pending is the number of scheduled tasks that are not currently executing.
	Pending int
	// Running is the number of tasks currently executing.
	Running int
	// Status indicates whether the manager is accepting new tasks, draining existing ones, or fully closed.
	Status Status
}

Stats is a point-in-time snapshot of manager state.

type Status added in v0.2.0

type Status int

Status represents the current lifecycle state of a Manager.

const (
	// StatusAccepting means the manager accepts new schedules.
	StatusAccepting Status = iota
	// StatusDraining means shutdown has started and running tasks are draining.
	StatusDraining
	// StatusClosed means shutdown has completed.
	StatusClosed
)

func (Status) String added in v0.2.0

func (s Status) String() string

type Strategy

type Strategy int

Strategy defines how the manager behaves when the concurrency limit is reached.

const (
	// StrategyBlock waits for a concurrency slot to become available.
	StrategyBlock Strategy = iota
	// StrategyDrop ignores the execution if the concurrency limit is reached.
	StrategyDrop
)

type Task

type Task func(ctx context.Context)

Task defines the function signature for a scheduled action. The provided context is cancelled if the manager shuts down or the task is replaced.

type TelemetryHandler

type TelemetryHandler interface {
	OnScheduled(id string, d time.Duration)
	OnRescheduled(id string)
	OnExecuted(id string, duration time.Duration)
	OnCancelled(id string)
	OnFailed(id string, err error)
}

TelemetryHandler receives lifecycle events for scheduled tasks.

Implementations can be used to emit logs, metrics, or traces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL