changroup

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2024 License: MIT Imports: 1 Imported by: 0

README

changroup
go minimal version go tested version CI Codecov Maintainability Go Report Card License Go Reference

changroup is a Go library to create a group of channels (publish/subscribe pattern). A value is sent to each channel in the group. Channels can be acquired/released dynamically.

changroup.Group allows to acquire/release channel and to send a value to all acquired channels.

changroup.AckableGroup does the same, but sends changroup.Ackable value. It calls original ack function only after all subscribers acked their copy of value. It's useful if you need to know when the message is processed.

Generics

The minimal supported go version is 1.18 because the library uses generics.

Installation

go get github.com/maratori/changroup

Usage

Scenario 1

Create all channels before sending values (before publisher starts). In this case all channels are guaranteed to receive all values.

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/maratori/changroup"
)

func main() {
	group := changroup.NewGroup[int]()
	ch1, _ := group.Acquire()
	ch2, _ := group.Acquire()

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func() {
		defer wg.Done()
		defer group.ReleaseAll() // close all channels and remove from group

		for i := 0; i < 100; i++ {
			group.Send(i)
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		defer wg.Done()
		for i := range ch1 {
			fmt.Println("subscriber 1 received", i)
		}
		fmt.Println("ch1 is closed because group.ReleaseAll() is called")
	}()

	go func() {
		defer wg.Done()
		for i := range ch2 {
			fmt.Println("subscriber 2 received", i)
		}
		fmt.Println("ch2 is closed because group.ReleaseAll() is called")
	}()

	wg.Wait()
}
Scenario 2

Create channels dynamically (after publisher started). In this case some values may be dropped because there are no subscribers at the moment.

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/maratori/changroup"
)

func main() {
	group := changroup.NewGroup[int]()

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func() {
		defer wg.Done()
		defer group.ReleaseAll() // close all channels and remove from group

		for i := 0; i < 100; i++ {
			group.Send(i)
			time.Sleep(1 * time.Second)
		}
	}()

	// subscribe, wait for a specific value, then unsubscribe
	go func() {
		defer wg.Done()
		ch, release := group.Acquire()
		defer release() // close channel and remove from group

		for i := range ch {
			fmt.Println("subscriber 1 received", i)
			if i == 20 {
				// do something
				return
			}
		}
	}()

	// read all values
	go func() {
		defer wg.Done()
		ch, _ := group.Acquire()
		for i := range ch {
			fmt.Println("subscriber 2 received", i)
		}
		fmt.Println("ch is closed because group.ReleaseAll() is called")
	}()

	wg.Wait()
}
Scenario 3

Do something in publisher after ack from all subscribers.

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/maratori/changroup"
)

func main() {
	group := changroup.NewAckableGroup[int]()
	ch1, _ := group.Acquire()
	ch2, _ := group.Acquire()

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func() {
		defer wg.Done()
		defer group.ReleaseAll() // close all channels and remove from group

		for i := 0; i < 100; i++ {
			group.Send(changroup.NewAckable(i, func() {
				fmt.Println("publisher received acks from all subscribers for i =", i)
			}))
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		defer wg.Done()
		for a := range ch1 {
			fmt.Println("subscriber 1 received", a.Value)
			a.Ack() // value is processed
		}
		fmt.Println("ch1 is closed because group.ReleaseAll() is called")
	}()

	go func() {
		defer wg.Done()
		for a := range ch2 {
			fmt.Println("subscriber 2 received", a.Value)
			a.Ack() // value is processed
		}
		fmt.Println("ch2 is closed because group.ReleaseAll() is called")
	}()

	wg.Wait()
}

Contribution

You are welcome to create an issue or pull request with improvements and fixes. See guide.

License

MIT License

Documentation

Overview

Package changroup implements publish/subscribe pattern with group of channels.

A value is sent to each channel in the group. Channels can be acquired/released dynamically.

Group allows to acquire/release channel and to send a value to all acquired channels.

AckableGroup does the same, but sends Ackable value. It calls original ack function only after all subscribers acked their copy of value. It's useful if you need to know when the message is processed.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Ackable

type Ackable[T any] struct {
	Value T
	Ack   func()
}

Ackable holds Value and Ack func which must be called after the value is processed.

func NewAckable

func NewAckable[T any](value T, ack func()) Ackable[T]

type AckableGroup

type AckableGroup[T any] struct {
	// contains filtered or unexported fields
}

AckableGroup provides pub-sub model working with channels.

Each acquired channel will receive a copy of an Ackable value provided to AckableGroup.Send. Original [Ackable.Ack] will be called after all copies are acked.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/maratori/changroup"
)

func main() {
	group := changroup.NewAckableGroup[int]()
	ch1, _ := group.Acquire()
	ch2, _ := group.Acquire()

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func() {
		defer wg.Done()
		defer group.ReleaseAll() // close all channels and remove from group

		for i := 0; i < 100; i++ {
			group.Send(changroup.NewAckable(i, func() {
				fmt.Println("publisher received acks from all subscribers for i =", i)
			}))
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		defer wg.Done()
		for a := range ch1 {
			fmt.Println("subscriber 1 received", a.Value)
			a.Ack() // value is processed
		}
		fmt.Println("ch1 is closed because group.ReleaseAll() is called")
	}()

	go func() {
		defer wg.Done()
		for a := range ch2 {
			fmt.Println("subscriber 2 received", a.Value)
			a.Ack() // value is processed
		}
		fmt.Println("ch2 is closed because group.ReleaseAll() is called")
	}()

	wg.Wait()
}

func NewAckableGroup

func NewAckableGroup[T any]() *AckableGroup[T]

func (*AckableGroup[T]) Acquire

func (g *AckableGroup[T]) Acquire() (<-chan Ackable[T], ReleaseFunc)

Acquire creates new channel and adds it to group.

ReleaseFunc is returned as the second value. It should be called to remove the channel from the group and close it. It's safe to call ReleaseFunc several times as well as in parallel with AckableGroup.ReleaseAll.

func (*AckableGroup[T]) ReleaseAll

func (g *AckableGroup[T]) ReleaseAll()

ReleaseAll releases all acquired channels and closes them. It's safe to call AckableGroup.ReleaseAll several times as well as in parallel with ReleaseFunc.

func (*AckableGroup[T]) Send

func (g *AckableGroup[T]) Send(value Ackable[T])

Send sends a copy of Ackable value to each acquired channel.

Each copy has its own [Ackable.Ack]. Original [Ackable.Ack] will be called after all copies are acked. AckableGroup.Send doesn't wait for ack.

It guarantees that all channels receive the values in the same order. And that the order is the same as AckableGroup.Send calls.

It waits for all channels to receive the value or to be released.

func (*AckableGroup[T]) SendAsync

func (g *AckableGroup[T]) SendAsync(value Ackable[T])

SendAsync sends a value to each acquired channel, but unlike AckableGroup.Send doesn't block. Also, it doesn't preserve the order of values!

type Group

type Group[T any] struct {
	// contains filtered or unexported fields
}

Group provides pub-sub model working with channels.

Each acquired channel will receive a copy of a value provided to Group.Send.

Example (SubscribeBeforePublish)
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/maratori/changroup"
)

func main() {
	group := changroup.NewGroup[int]()
	ch1, _ := group.Acquire()
	ch2, _ := group.Acquire()

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func() {
		defer wg.Done()
		defer group.ReleaseAll() // close all channels and remove from group

		for i := 0; i < 100; i++ {
			group.Send(i)
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		defer wg.Done()
		for i := range ch1 {
			fmt.Println("subscriber 1 received", i)
		}
		fmt.Println("ch1 is closed because group.ReleaseAll() is called")
	}()

	go func() {
		defer wg.Done()
		for i := range ch2 {
			fmt.Println("subscriber 2 received", i)
		}
		fmt.Println("ch2 is closed because group.ReleaseAll() is called")
	}()

	wg.Wait()
}
Example (SubscribeDynamically)
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/maratori/changroup"
)

func main() {
	group := changroup.NewGroup[int]()

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func() {
		defer wg.Done()
		defer group.ReleaseAll() // close all channels and remove from group

		for i := 0; i < 100; i++ {
			group.Send(i)
			time.Sleep(1 * time.Second)
		}
	}()

	// subscribe, wait for a specific value, then unsubscribe
	go func() {
		defer wg.Done()
		ch, release := group.Acquire()
		defer release() // close channel and remove from group

		for i := range ch {
			fmt.Println("subscriber 1 received", i)
			if i == 20 {
				// do something
				return
			}
		}
	}()

	// read all values
	go func() {
		defer wg.Done()
		ch, _ := group.Acquire()
		for i := range ch {
			fmt.Println("subscriber 2 received", i)
		}
		fmt.Println("ch is closed because group.ReleaseAll() is called")
	}()

	wg.Wait()
}

func NewGroup

func NewGroup[T any]() *Group[T]

func (*Group[T]) Acquire

func (g *Group[T]) Acquire() (<-chan T, ReleaseFunc)

Acquire creates new channel and adds it to group.

ReleaseFunc is returned as the second value. It should be called to remove the channel from the group and close it. It's safe to call ReleaseFunc several times as well as in parallel with Group.ReleaseAll.

func (*Group[T]) ReleaseAll

func (g *Group[T]) ReleaseAll()

ReleaseAll releases all acquired channels and closes them. It's safe to call Group.ReleaseAll several times as well as in parallel with ReleaseFunc.

func (*Group[T]) Send

func (g *Group[T]) Send(value T)

Send sends a value to each acquired channel.

It guarantees that all channels receive the values in the same order. And that the order is the same as Group.Send calls.

It waits for all channels to receive the value or to be released.

func (*Group[T]) SendAsync

func (g *Group[T]) SendAsync(value T)

SendAsync sends a value to each acquired channel, but unlike Group.Send doesn't block. Also, it doesn't preserve the order of values!

type ReleaseFunc

type ReleaseFunc func()

ReleaseFunc is called to remove channel from group and close it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL