Skip to content

Kingson4Wu/quick_worker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

quick_worker

A high-performance, scalable, and customizable batch processing framework for Go, based on the producer-consumer model. Supports dynamic scaling of consumers (goroutines), batch reporting, and flexible extension for various business scenarios.

Features

  • Batch Processing: Efficiently collects and processes data in batches, reducing system overhead.
  • Dynamic Scaling: Supports automatic and custom scaling of consumer goroutines based on queue status.
  • Customizable: Provides hooks for custom batch reporting and scaling strategies.
  • Graceful Shutdown: Ensures all data is processed before shutdown, with configurable wait times.
  • Memory Optimization: Integrates a bytes pool to reduce memory allocations.
  • Easy Integration: Simple API for adding workers and managing batch jobs.

Architecture & Principle

  • Producer-Consumer Model: Data is produced and pushed into a buffered channel. Multiple consumer goroutines fetch and process data in batches.
  • Batch Trigger: Batch processing is triggered by either reaching a maximum batch size or a maximum wait duration.
  • Dynamic Consumer Adjustment: The number of consumer goroutines can be adjusted at runtime based on queue length or custom logic.
  • Collector: Manages multiple workers, providing unified start and wait control.

Design Overview

graph TD;
    Producer-->|Push Data|Worker;
    Worker-->|Batch|Consumer;
    Consumer-->|ReportFunc|BusinessLogic;
    Worker-->|Dynamic Scaling|Consumer;
    Collector-->|Manage|Worker;
Loading
  • Worker: Handles batch collection, reporting, and dynamic scaling.
  • Collector: Aggregates multiple workers for unified management.
  • ReportFunc: User-defined function for batch processing.
  • AdjustStrategyFunc: User-defined function for scaling consumers.

Quick Start

import (
    "github.com/kingson4wu/quick_worker/core"
    "time"
)

func main() {
    collector := core.NewCollector(&core.CollectorConf{})
    collector.AddWorker(&core.WorkerConf{
        Name:         "example",
        NumConsumers: 3,
        MaxBatchSize: 10,
        BufferSize:   100,
        MaxWaitDuration: time.Second,
        ReportFunc: func(clientId int, batch [][]byte, w *core.Worker) error {
            // Process batch
            return nil
        },
    })
    collector.Start()
    worker := collector.GetWorker("example")
    worker.Produce([]byte("data1"))
    worker.Produce([]byte("data2"))
    // ...
    collector.Wait()
}

Configuration

  • WorkerConf

    • Name (string): Worker name (required)
    • NumConsumers (int): Initial number of consumers (default: 5)
    • MaxBatchSize (int): Max batch size to trigger report (default: 30)
    • BufferSize (int): Channel buffer size (default: 200)
    • MaxWaitDuration (time.Duration): Max wait time before batch report (default: 1s)
    • WaitTimeConsumerBeforeStopping (time.Duration): Wait time before stopping consumers
    • WaitTimeProducerBeforeStopping (time.Duration): Wait time before stopping producers
    • ReportFunc (func): Custom batch processing logic
    • RetryTimes (int): Retry times for batch processing
  • CollectorConf

    • Logger (Logger): Custom logger implementation

Dynamic Scaling

You can provide a custom scaling strategy:

worker.SetAdjustStrategy(func(current, qlen, qcap int) int {
    if qlen > 10 && current < 10 {
        return current + 1 // scale up
    }
    if qlen == 0 && current > 1 {
        return current - 1 // scale down
    }
    return current
})

Extension Points

  • ReportFunc: Custom batch processing logic.
  • AdjustStrategyFunc: Custom scaling logic for consumers.
  • Logger: Plug in your own logger.

Testing

  • All test cases are automated and safe for CI/CD.
  • To run tests:
go test ./...

License

MIT

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages