Skip to content

Commit 19d8355

Browse files
committed
Merge branch 'Lee-Minjea-feat/throttle'
2 parents d587677 + 5cd3266 commit 19d8355

File tree

4 files changed

+378
-1
lines changed

4 files changed

+378
-1
lines changed

README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
# lo - Iterate over slices, maps, channels...
23

34
[![tag](https://img.shields.io/github/tag/samber/lo.svg)](https://github.com/samber/lo/releases)
@@ -297,6 +298,7 @@ Concurrency helpers:
297298
- [AttemptWhileWithDelay](#attemptwhilewithdelay)
298299
- [Debounce](#debounce)
299300
- [DebounceBy](#debounceby)
301+
- [Throttle](#throttle)
300302
- [Synchronize](#synchronize)
301303
- [Async](#async)
302304
- [Transaction](#transaction)
@@ -3417,6 +3419,64 @@ cancel("second key")
34173419

34183420
[[play](https://go.dev/play/p/d3Vpt6pxhY8)]
34193421

3422+
### Throttle
3423+
3424+
Creates a throttled instance that invokes given functions only once in every interval.
3425+
3426+
This returns 2 functions, First one is throttled function and Second one is a function to reset interval.
3427+
3428+
```go
3429+
f := func() {
3430+
println("Called once in every 100ms")
3431+
}
3432+
3433+
throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)
3434+
3435+
for j := 0; j < 10; j++ {
3436+
throttle()
3437+
time.Sleep(30 * time.Millisecond)
3438+
}
3439+
3440+
reset()
3441+
throttle()
3442+
```
3443+
3444+
`NewThrottleWithCount` is NewThrottle with count limit, throttled function will be invoked count times in every interval.
3445+
3446+
```go
3447+
f := func() {
3448+
println("Called three times in every 100ms")
3449+
}
3450+
3451+
throttle, reset := lo.NewThrottleWithCount(100 * time.Millisecond, f)
3452+
3453+
for j := 0; j < 10; j++ {
3454+
throttle()
3455+
time.Sleep(30 * time.Millisecond)
3456+
}
3457+
3458+
reset()
3459+
throttle()
3460+
```
3461+
3462+
`NewThrottleBy` and `NewThrottleByWithCount` are NewThrottle with sharding key, throttled function will be invoked count times in every interval.
3463+
3464+
```go
3465+
f := func(key string) {
3466+
println(key, "Called three times in every 100ms")
3467+
}
3468+
3469+
throttle, reset := lo.NewThrottleByWithCount(100 * time.Millisecond, f)
3470+
3471+
for j := 0; j < 10; j++ {
3472+
throttle("foo")
3473+
time.Sleep(30 * time.Millisecond)
3474+
}
3475+
3476+
reset()
3477+
throttle()
3478+
```
3479+
34203480
### Synchronize
34213481

34223482
Wraps the underlying callback in a mutex. It receives an optional mutex.

retry.go

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,89 @@ func (t *Transaction[T]) Process(state T) (T, error) {
287287
return state, err
288288
}
289289

290-
// throttle ?
290+
// @TODO: single mutex per key ?
291+
type throttleBy[T comparable] struct {
292+
mu *sync.Mutex
293+
timer *time.Timer
294+
interval time.Duration
295+
callbacks []func(key T)
296+
countLimit int
297+
count map[T]int
298+
}
299+
300+
func (th *throttleBy[T]) throttledFunc(key T) {
301+
th.mu.Lock()
302+
defer th.mu.Unlock()
303+
304+
if _, ok := th.count[key]; !ok {
305+
th.count[key] = 0
306+
}
307+
308+
if th.count[key] < th.countLimit {
309+
th.count[key]++
310+
311+
for _, f := range th.callbacks {
312+
f(key)
313+
}
314+
315+
}
316+
if th.timer == nil {
317+
th.timer = time.AfterFunc(th.interval, func() {
318+
th.reset()
319+
})
320+
}
321+
}
322+
323+
func (th *throttleBy[T]) reset() {
324+
th.mu.Lock()
325+
defer th.mu.Unlock()
326+
327+
if th.timer != nil {
328+
th.timer.Stop()
329+
}
330+
331+
th.count = map[T]int{}
332+
th.timer = nil
333+
}
334+
335+
// NewThrottle creates a throttled instance that invokes given functions only once in every interval.
336+
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
337+
func NewThrottle(interval time.Duration, f ...func()) (throttle func(), reset func()) {
338+
return NewThrottleWithCount(interval, 1, f...)
339+
}
340+
341+
// NewThrottleWithCount is NewThrottle with count limit, throttled function will be invoked count times in every interval.
342+
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (throttle func(), reset func()) {
343+
callbacks := Map(f, func(item func(), _ int) func(struct{}) {
344+
return func(struct{}) {
345+
item()
346+
}
347+
})
348+
349+
throttleFn, reset := NewThrottleByWithCount[struct{}](interval, count, callbacks...)
350+
return func() {
351+
throttleFn(struct{}{})
352+
}, reset
353+
}
354+
355+
// NewThrottleBy creates a throttled instance that invokes given functions only once in every interval.
356+
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
357+
func NewThrottleBy[T comparable](interval time.Duration, f ...func(key T)) (throttle func(key T), reset func()) {
358+
return NewThrottleByWithCount[T](interval, 1, f...)
359+
}
360+
361+
// NewThrottleByWithCount is NewThrottleBy with count limit, throttled function will be invoked count times in every interval.
362+
func NewThrottleByWithCount[T comparable](interval time.Duration, count int, f ...func(key T)) (throttle func(key T), reset func()) {
363+
if count <= 0 {
364+
count = 1
365+
}
366+
367+
th := &throttleBy[T]{
368+
mu: new(sync.Mutex),
369+
interval: interval,
370+
callbacks: f,
371+
countLimit: count,
372+
count: map[T]int{},
373+
}
374+
return th.throttledFunc, th.reset
375+
}

retry_example_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,92 @@ func ExampleTransaction_error() {
249249
// -5
250250
// error
251251
}
252+
253+
func ExampleNewThrottle() {
254+
throttle, reset := NewThrottle(100*time.Millisecond, func() {
255+
fmt.Println("Called once in every 100ms")
256+
})
257+
258+
for j := 0; j < 10; j++ {
259+
throttle()
260+
time.Sleep(30 * time.Millisecond)
261+
}
262+
263+
reset()
264+
265+
// Output:
266+
// Called once in every 100ms
267+
// Called once in every 100ms
268+
// Called once in every 100ms
269+
}
270+
271+
func ExampleNewThrottleWithCount() {
272+
throttle, reset := NewThrottleWithCount(100*time.Millisecond, 2, func() {
273+
fmt.Println("Called once in every 100ms")
274+
})
275+
276+
for j := 0; j < 10; j++ {
277+
throttle()
278+
time.Sleep(30 * time.Millisecond)
279+
}
280+
281+
reset()
282+
283+
// Output:
284+
// Called once in every 100ms
285+
// Called once in every 100ms
286+
// Called once in every 100ms
287+
// Called once in every 100ms
288+
// Called once in every 100ms
289+
// Called once in every 100ms
290+
}
291+
292+
func ExampleNewThrottleBy() {
293+
throttle, reset := NewThrottleBy(100*time.Millisecond, func(key string) {
294+
fmt.Println(key, "Called once in every 100ms")
295+
})
296+
297+
for j := 0; j < 10; j++ {
298+
throttle("foo")
299+
throttle("bar")
300+
time.Sleep(30 * time.Millisecond)
301+
}
302+
303+
reset()
304+
305+
// Output:
306+
// foo Called once in every 100ms
307+
// bar Called once in every 100ms
308+
// foo Called once in every 100ms
309+
// bar Called once in every 100ms
310+
// foo Called once in every 100ms
311+
// bar Called once in every 100ms
312+
}
313+
314+
func ExampleNewThrottleByWithCount() {
315+
throttle, reset := NewThrottleByWithCount(100*time.Millisecond, 2, func(key string) {
316+
fmt.Println(key, "Called once in every 100ms")
317+
})
318+
319+
for j := 0; j < 10; j++ {
320+
throttle("foo")
321+
throttle("bar")
322+
time.Sleep(30 * time.Millisecond)
323+
}
324+
325+
reset()
326+
327+
// Output:
328+
// foo Called once in every 100ms
329+
// bar Called once in every 100ms
330+
// foo Called once in every 100ms
331+
// bar Called once in every 100ms
332+
// foo Called once in every 100ms
333+
// bar Called once in every 100ms
334+
// foo Called once in every 100ms
335+
// bar Called once in every 100ms
336+
// foo Called once in every 100ms
337+
// bar Called once in every 100ms
338+
// foo Called once in every 100ms
339+
// bar Called once in every 100ms
340+
}

0 commit comments

Comments
 (0)