Conversation
+ Include an implementaiton of the routines specified in ADR-43
along with a demuxer and some dummy reactor code
+ `routine.send` returns false when routine is not running
+ this will prevent panics sending to channels which have been
closed
+ Make output channels routine specific removing the risk of someone
writting to a channel which was closed by another touine.
+ consistency changes between the routines and the demuxer
+ ensure that we stop accepting messages once `stop` has been called
to avoid the case in which we attempt to write to a channel which
has already been closed
Codecov Report
@@ Coverage Diff @@
## master #3878 +/- ##
==========================================
- Coverage 66.86% 66.71% -0.15%
==========================================
Files 221 225 +4
Lines 18510 18727 +217
==========================================
+ Hits 12376 12493 +117
- Misses 5211 5309 +98
- Partials 923 925 +2
|
+ use `trySend` the replicate peer sending
+ expose `next()` as a chan of events as output
+ expose `final()` as a chan of error, for the final error
+ add `ready()` as chan struct when routine is ready
blockchain/v2/demuxer.go
Outdated
| fin chan error | ||
| stopped chan struct{} | ||
| rdy chan struct{} | ||
| running *uint32 |
There was a problem hiding this comment.
have you seen the common/BaseService struct https://godoc.org/github.com/tendermint/tendermint/libs/common#BaseService ? feels like you're doing exactly the same
There was a problem hiding this comment.
I also thought routines could implement the service interface. I spent a bit of time trying trying to make it work but it felt wrong for a few reasons.
Routines expose methods associated with the lifecycle of a finite state machine while services don't. Implementing OnStart and OnReset for FSM didn't really make sense to me as we don't embed the state in the struct itself but instead close over it. This restricts how state in routines is managed but I would suggest this limitation is a good thing. We want
one routine to manage one state and the transition between states to happen from a serialized stream of events.
Routines will ideally terminate of their own accord and need to communicate their final event. This is an important requirement for the blockchain reactor as it needs to communicate when to switch to consensus.
While it would most likely be possible to modify the routine to adhere to the Service interface, i'm not sure it's worth it since I cannot envision any cases in which we would want either a Service or a Routine and not care about which.
| go r.processor.start() | ||
| go r.demuxer.start() | ||
|
|
||
| <-r.scheduler.ready() |
There was a problem hiding this comment.
Can't we use WaitGroup or https://godoc.org/golang.org/x/sync/errgroup here?
There was a problem hiding this comment.
Definitely could use wait groups here but didn't want to include inter routine coordination in the routine. I updated the the code s/t the rdy is now closed as soon as the routine in ready, this way <-ready() will return immediately if the routine is "ready" as the name implies.
|
|
||
| type handleFunc = func(event Event) (Events, error) | ||
|
|
||
| type Routine struct { |
There was a problem hiding this comment.
Could you explain the rational for this struct?
There was a problem hiding this comment.
The intention for routines is to provide lifecycle management (start/stop) behaviour to adr-30 style finite state machines characterized by functions of the form:
type handleFunc = func(event Event) (Events, error)The abstraction allows us to assert the fulfilment of event delivery guarantees without domain specific logic. It also allows us to assert the correctness of domain specific logic without coupling message delivery concerns.
+ close `rdy` channel to ensure that calls to `<-ready()` will
always return if the routine is ready
ebuchman
left a comment
There was a problem hiding this comment.
High level thoughts from review:
- Channel Capacity
- Routine API
- should start() panic if its already started ?
blockchain/v2/routine.go
Outdated
| // * audit log levels | ||
| // * Convert routine to an interface with concrete implmentation | ||
|
|
||
| type handleFunc = func(event Event) (Events, error) |
There was a problem hiding this comment.
Do we need multiple events in the output?
There was a problem hiding this comment.
I think you're right, switching it to single event output made it simpler.
blockchain/v2/routine.go
Outdated
| handle handleFunc | ||
| logger log.Logger | ||
| metrics *Metrics | ||
| stopping *uint32 |
blockchain/v2/routine.go
Outdated
| rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) | ||
| rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) | ||
| for _, event := range oEvents { | ||
| rt.logger.Info(fmt.Sprintln("writing back to output")) |
There was a problem hiding this comment.
Debug here.
Also just add the name to the logger itself.
blockchain/v2/routine.go
Outdated
| } | ||
| rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) | ||
| for _, event := range oEvents { | ||
| rt.out <- event |
| time.Sleep(10 * time.Millisecond) | ||
| } | ||
| }() | ||
|
|
There was a problem hiding this comment.
Can we assert the above go routine is already running before we trySend the error ?
* Routines will now use a priority queue instead of channels to
iterate over events
| var done = fmt.Errorf("done") | ||
|
|
||
| func simpleHandler(event Event) (Event, error) { | ||
| switch event.(type) { |
There was a problem hiding this comment.
singleCaseSwitch: should rewrite switch statement to if statement (from gocritic)
+ Simplify the design by demuxing events directly in the reactor
| } | ||
|
|
||
| func schedulerHandle(event Event) (Event, error) { | ||
| switch event.(type) { |
There was a problem hiding this comment.
singleCaseSwitch: should rewrite switch statement to if statement (from gocritic)
| } | ||
|
|
||
| func processorHandle(event Event) (Event, error) { | ||
| switch event.(type) { |
There was a problem hiding this comment.
singleCaseSwitch: should rewrite switch statement to if statement (from gocritic)
blockchain/v2/reactor.go
Outdated
| // XXX: check for backpressure | ||
| r.scheduler.trySend(event) | ||
| r.processor.trySend(event) | ||
| case _ = <-r.stopDemux: |
There was a problem hiding this comment.
S1005: '_ = <-ch' can be simplified to '<-ch' (from gosimple)
| } | ||
| } | ||
|
|
||
| func (rt *Routine) setLogger(logger log.Logger) { |
There was a problem hiding this comment.
U1000: func (*Routine).setLogger is unused (from unused)
| "github.com/Workiva/go-datastructures/queue" | ||
| ) | ||
|
|
||
| type Event queue.Item |
There was a problem hiding this comment.
Event redeclared in this block (from typecheck)
| if !rt.isRunning() { | ||
| return false | ||
| } | ||
| err := rt.queue.Put(event) |
There was a problem hiding this comment.
cannot use event (variable of type Event) as queue.Item value in argument to rt.queue.Put: missing method Compare (from typecheck)
ebuchman
left a comment
There was a problem hiding this comment.
Thanks sean. Merging for now, we'll keep iterating.
The architecture outlined in ADR-043 describes the interaction between concurrent routines including the scheduler and processor. These routines are aimed to allow better encapsulation of component business logic as well as provide consistent expectations around lifecycle management and concurrent message delivery. This PR includes an implementation of the
Routinewhich is inspired by BaseService with certain key differences but shaped to fulfil some of the capabilities of the v1 fsmInstead of a Start method which is expected to launch internal goroutines,
Routinesexpose aStartmethod which are intended to be executed in a goroutine coordinates externally by the caller.Routines are characterized by stateless
func(event Event) (Events, error)which specify message schemes in the style of ADR-30Routine are
trySend()message and returnfalseto indicate back pressureRoutines can be stopped gracefully, completing all
sentmessage, or terminated, where all unprocessed messages are dropped.Referenced an issue explaining the need for the change
Updated all relevant documentation in docs
Updated all code comments where relevant
Wrote tests
Updated CHANGELOG_PENDING.md