A high-performance, scalable, and customizable batch processing framework for Go, based on the producer-consumer model. Supports dynamic scaling of consumers (goroutines), batch reporting, and flexible extension for various business scenarios.
- Batch Processing: Efficiently collects and processes data in batches, reducing system overhead.
- Dynamic Scaling: Supports automatic and custom scaling of consumer goroutines based on queue status.
- Customizable: Provides hooks for custom batch reporting and scaling strategies.
- Graceful Shutdown: Ensures all data is processed before shutdown, with configurable wait times.
- Memory Optimization: Integrates a bytes pool to reduce memory allocations.
- Easy Integration: Simple API for adding workers and managing batch jobs.
- Producer-Consumer Model: Data is produced and pushed into a buffered channel. Multiple consumer goroutines fetch and process data in batches.
- Batch Trigger: Batch processing is triggered by either reaching a maximum batch size or a maximum wait duration.
- Dynamic Consumer Adjustment: The number of consumer goroutines can be adjusted at runtime based on queue length or custom logic.
- Collector: Manages multiple workers, providing unified start and wait control.
graph TD;
Producer-->|Push Data|Worker;
Worker-->|Batch|Consumer;
Consumer-->|ReportFunc|BusinessLogic;
Worker-->|Dynamic Scaling|Consumer;
Collector-->|Manage|Worker;
- Worker: Handles batch collection, reporting, and dynamic scaling.
- Collector: Aggregates multiple workers for unified management.
- ReportFunc: User-defined function for batch processing.
- AdjustStrategyFunc: User-defined function for scaling consumers.
import (
"github.com/kingson4wu/quick_worker/core"
"time"
)
func main() {
collector := core.NewCollector(&core.CollectorConf{})
collector.AddWorker(&core.WorkerConf{
Name: "example",
NumConsumers: 3,
MaxBatchSize: 10,
BufferSize: 100,
MaxWaitDuration: time.Second,
ReportFunc: func(clientId int, batch [][]byte, w *core.Worker) error {
// Process batch
return nil
},
})
collector.Start()
worker := collector.GetWorker("example")
worker.Produce([]byte("data1"))
worker.Produce([]byte("data2"))
// ...
collector.Wait()
}-
WorkerConf
Name(string): Worker name (required)NumConsumers(int): Initial number of consumers (default: 5)MaxBatchSize(int): Max batch size to trigger report (default: 30)BufferSize(int): Channel buffer size (default: 200)MaxWaitDuration(time.Duration): Max wait time before batch report (default: 1s)WaitTimeConsumerBeforeStopping(time.Duration): Wait time before stopping consumersWaitTimeProducerBeforeStopping(time.Duration): Wait time before stopping producersReportFunc(func): Custom batch processing logicRetryTimes(int): Retry times for batch processing
-
CollectorConf
Logger(Logger): Custom logger implementation
You can provide a custom scaling strategy:
worker.SetAdjustStrategy(func(current, qlen, qcap int) int {
if qlen > 10 && current < 10 {
return current + 1 // scale up
}
if qlen == 0 && current > 1 {
return current - 1 // scale down
}
return current
})- ReportFunc: Custom batch processing logic.
- AdjustStrategyFunc: Custom scaling logic for consumers.
- Logger: Plug in your own logger.
- All test cases are automated and safe for CI/CD.
- To run tests:
go test ./...MIT