Documentation
¶
Overview ¶
Package changroup implements publish/subscribe pattern with group of channels.
A value is sent to each channel in the group. Channels can be acquired/released dynamically.
Group allows to acquire/release channel and to send a value to all acquired channels.
AckableGroup does the same, but sends Ackable value. It calls original ack function only after all subscribers acked their copy of value. It's useful if you need to know when the message is processed.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Ackable ¶
type Ackable[T any] struct { Value T Ack func() }
Ackable holds Value and Ack func which must be called after the value is processed.
func NewAckable ¶
type AckableGroup ¶
type AckableGroup[T any] struct { // contains filtered or unexported fields }
AckableGroup provides pub-sub model working with channels.
Each acquired channel will receive a copy of an Ackable value provided to AckableGroup.Send. Original [Ackable.Ack] will be called after all copies are acked.
Example ¶
package main
import (
"fmt"
"sync"
"time"
"github.com/maratori/changroup"
)
func main() {
group := changroup.NewAckableGroup[int]()
ch1, _ := group.Acquire()
ch2, _ := group.Acquire()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
defer group.ReleaseAll() // close all channels and remove from group
for i := 0; i < 100; i++ {
group.Send(changroup.NewAckable(i, func() {
fmt.Println("publisher received acks from all subscribers for i =", i)
}))
time.Sleep(1 * time.Second)
}
}()
go func() {
defer wg.Done()
for a := range ch1 {
fmt.Println("subscriber 1 received", a.Value)
a.Ack() // value is processed
}
fmt.Println("ch1 is closed because group.ReleaseAll() is called")
}()
go func() {
defer wg.Done()
for a := range ch2 {
fmt.Println("subscriber 2 received", a.Value)
a.Ack() // value is processed
}
fmt.Println("ch2 is closed because group.ReleaseAll() is called")
}()
wg.Wait()
}
func NewAckableGroup ¶
func NewAckableGroup[T any]() *AckableGroup[T]
func (*AckableGroup[T]) Acquire ¶
func (g *AckableGroup[T]) Acquire() (<-chan Ackable[T], ReleaseFunc)
Acquire creates new channel and adds it to group.
ReleaseFunc is returned as the second value. It should be called to remove the channel from the group and close it. It's safe to call ReleaseFunc several times as well as in parallel with AckableGroup.ReleaseAll.
func (*AckableGroup[T]) ReleaseAll ¶
func (g *AckableGroup[T]) ReleaseAll()
ReleaseAll releases all acquired channels and closes them. It's safe to call AckableGroup.ReleaseAll several times as well as in parallel with ReleaseFunc.
func (*AckableGroup[T]) Send ¶
func (g *AckableGroup[T]) Send(value Ackable[T])
Send sends a copy of Ackable value to each acquired channel.
Each copy has its own [Ackable.Ack]. Original [Ackable.Ack] will be called after all copies are acked. AckableGroup.Send doesn't wait for ack.
It guarantees that all channels receive the values in the same order. And that the order is the same as AckableGroup.Send calls.
It waits for all channels to receive the value or to be released.
func (*AckableGroup[T]) SendAsync ¶
func (g *AckableGroup[T]) SendAsync(value Ackable[T])
SendAsync sends a value to each acquired channel, but unlike AckableGroup.Send doesn't block. Also, it doesn't preserve the order of values!
type Group ¶
type Group[T any] struct { // contains filtered or unexported fields }
Group provides pub-sub model working with channels.
Each acquired channel will receive a copy of a value provided to Group.Send.
Example (SubscribeBeforePublish) ¶
package main
import (
"fmt"
"sync"
"time"
"github.com/maratori/changroup"
)
func main() {
group := changroup.NewGroup[int]()
ch1, _ := group.Acquire()
ch2, _ := group.Acquire()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
defer group.ReleaseAll() // close all channels and remove from group
for i := 0; i < 100; i++ {
group.Send(i)
time.Sleep(1 * time.Second)
}
}()
go func() {
defer wg.Done()
for i := range ch1 {
fmt.Println("subscriber 1 received", i)
}
fmt.Println("ch1 is closed because group.ReleaseAll() is called")
}()
go func() {
defer wg.Done()
for i := range ch2 {
fmt.Println("subscriber 2 received", i)
}
fmt.Println("ch2 is closed because group.ReleaseAll() is called")
}()
wg.Wait()
}
Example (SubscribeDynamically) ¶
package main
import (
"fmt"
"sync"
"time"
"github.com/maratori/changroup"
)
func main() {
group := changroup.NewGroup[int]()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
defer group.ReleaseAll() // close all channels and remove from group
for i := 0; i < 100; i++ {
group.Send(i)
time.Sleep(1 * time.Second)
}
}()
// subscribe, wait for a specific value, then unsubscribe
go func() {
defer wg.Done()
ch, release := group.Acquire()
defer release() // close channel and remove from group
for i := range ch {
fmt.Println("subscriber 1 received", i)
if i == 20 {
// do something
return
}
}
}()
// read all values
go func() {
defer wg.Done()
ch, _ := group.Acquire()
for i := range ch {
fmt.Println("subscriber 2 received", i)
}
fmt.Println("ch is closed because group.ReleaseAll() is called")
}()
wg.Wait()
}
func (*Group[T]) Acquire ¶
func (g *Group[T]) Acquire() (<-chan T, ReleaseFunc)
Acquire creates new channel and adds it to group.
ReleaseFunc is returned as the second value. It should be called to remove the channel from the group and close it. It's safe to call ReleaseFunc several times as well as in parallel with Group.ReleaseAll.
func (*Group[T]) ReleaseAll ¶
func (g *Group[T]) ReleaseAll()
ReleaseAll releases all acquired channels and closes them. It's safe to call Group.ReleaseAll several times as well as in parallel with ReleaseFunc.
func (*Group[T]) Send ¶
func (g *Group[T]) Send(value T)
Send sends a value to each acquired channel.
It guarantees that all channels receive the values in the same order. And that the order is the same as Group.Send calls.
It waits for all channels to receive the value or to be released.
func (*Group[T]) SendAsync ¶
func (g *Group[T]) SendAsync(value T)
SendAsync sends a value to each acquired channel, but unlike Group.Send doesn't block. Also, it doesn't preserve the order of values!
type ReleaseFunc ¶
type ReleaseFunc func()
ReleaseFunc is called to remove channel from group and close it.