Skip to content

[blockchain] v2 Routines#3878

Merged
brapse merged 25 commits intomasterfrom
brapse/blockchain-v2-riri-routine
Sep 18, 2019
Merged

[blockchain] v2 Routines#3878
brapse merged 25 commits intomasterfrom
brapse/blockchain-v2-riri-routine

Conversation

@brapse
Copy link
Contributor

@brapse brapse commented Aug 3, 2019

The architecture outlined in ADR-043 describes the interaction between concurrent routines including the scheduler and processor. These routines are aimed to allow better encapsulation of component business logic as well as provide consistent expectations around lifecycle management and concurrent message delivery. This PR includes an implementation of the Routine which is inspired by BaseService with certain key differences but shaped to fulfil some of the capabilities of the v1 fsm

  • Instead of a Start method which is expected to launch internal goroutines, Routines expose a Start method which are intended to be executed in a goroutine coordinates externally by the caller.

  • Routines are characterized by stateless func(event Event) (Events, error) which specify message schemes in the style of ADR-30

  • Routine are trySend()message and return false to indicate back pressure

  • Routines can be stopped gracefully, completing all sent message, or terminated, where all unprocessed messages are dropped.

  • Referenced an issue explaining the need for the change

  • Updated all relevant documentation in docs

  • Updated all code comments where relevant

  • Wrote tests

  • Updated CHANGELOG_PENDING.md

    + Include an implementaiton of the routines specified in ADR-43
    along with a demuxer and some dummy reactor code
@brapse brapse requested a review from ancazamfir August 3, 2019 07:21
brapse added 2 commits August 6, 2019 13:27
    + `routine.send` returns false when routine is not running
    + this will prevent panics sending to channels which have been
    closed
    + Make output channels routine specific removing the risk of someone
    writting to a channel which was closed by another touine.
    + consistency changes between the routines and the demuxer
    + ensure that we stop accepting messages once `stop` has been called
      to avoid the case in which we attempt to write to a channel which
      has already been closed
@codecov-io
Copy link

codecov-io commented Aug 6, 2019

Codecov Report

Merging #3878 into master will decrease coverage by 0.14%.
The diff coverage is 48.03%.

@@            Coverage Diff             @@
##           master    #3878      +/-   ##
==========================================
- Coverage   66.86%   66.71%   -0.15%     
==========================================
  Files         221      225       +4     
  Lines       18510    18727     +217     
==========================================
+ Hits        12376    12493     +117     
- Misses       5211     5309      +98     
- Partials      923      925       +2
Impacted Files Coverage Δ
blockchain/v2/schedule.go 67.01% <ø> (ø) ⬆️
blockchain/v2/metrics.go 15.58% <15.58%> (ø)
blockchain/v2/reactor.go 49.15% <49.15%> (ø)
blockchain/v2/types.go 55.55% <55.55%> (ø)
blockchain/v2/routine.go 81.81% <81.81%> (ø)
privval/signer_server.go 95.65% <0%> (-4.35%) ⬇️
privval/signer_endpoint.go 81.33% <0%> (-2.67%) ⬇️
privval/signer_dialer_endpoint.go 100% <0%> (ø) ⬆️
... and 8 more

    + use `trySend` the replicate peer sending
    + expose `next()` as a chan of events as output
    + expose `final()` as a chan of error, for the final error
    + add `ready()` as chan struct when routine is ready
@brapse brapse marked this pull request as ready for review August 8, 2019 15:44
@brapse brapse requested review from ebuchman, melekes and xla as code owners August 8, 2019 15:44
@brapse brapse changed the title [blockchain] Routines [blockchain] v2 Routines Aug 8, 2019
fin chan error
stopped chan struct{}
rdy chan struct{}
running *uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you seen the common/BaseService struct https://godoc.org/github.com/tendermint/tendermint/libs/common#BaseService ? feels like you're doing exactly the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought routines could implement the service interface. I spent a bit of time trying trying to make it work but it felt wrong for a few reasons.

Routines expose methods associated with the lifecycle of a finite state machine while services don't. Implementing OnStart and OnReset for FSM didn't really make sense to me as we don't embed the state in the struct itself but instead close over it. This restricts how state in routines is managed but I would suggest this limitation is a good thing. We want
one routine to manage one state and the transition between states to happen from a serialized stream of events.

Routines will ideally terminate of their own accord and need to communicate their final event. This is an important requirement for the blockchain reactor as it needs to communicate when to switch to consensus.

While it would most likely be possible to modify the routine to adhere to the Service interface, i'm not sure it's worth it since I cannot envision any cases in which we would want either a Service or a Routine and not care about which.

go r.processor.start()
go r.demuxer.start()

<-r.scheduler.ready()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use WaitGroup or https://godoc.org/golang.org/x/sync/errgroup here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely could use wait groups here but didn't want to include inter routine coordination in the routine. I updated the the code s/t the rdy is now closed as soon as the routine in ready, this way <-ready() will return immediately if the routine is "ready" as the name implies.


type handleFunc = func(event Event) (Events, error)

type Routine struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the rational for this struct?

Copy link
Contributor Author

@brapse brapse Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention for routines is to provide lifecycle management (start/stop) behaviour to adr-30 style finite state machines characterized by functions of the form:

type handleFunc = func(event Event) (Events, error)

The abstraction allows us to assert the fulfilment of event delivery guarantees without domain specific logic. It also allows us to assert the correctness of domain specific logic without coupling message delivery concerns.

brapse added 3 commits August 13, 2019 17:57
    + close `rdy` channel to ensure that calls to `<-ready()` will
    always return if the routine is ready
Copy link
Contributor

@ebuchman ebuchman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level thoughts from review:

  • Channel Capacity
  • Routine API
  • should start() panic if its already started ?

// * audit log levels
// * Convert routine to an interface with concrete implmentation

type handleFunc = func(event Event) (Events, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need multiple events in the output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, switching it to single event output made it simpler.

handle handleFunc
logger log.Logger
metrics *Metrics
stopping *uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better grouping

rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents)))
rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents)))
for _, event := range oEvents {
rt.logger.Info(fmt.Sprintln("writing back to output"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug here.

Also just add the name to the logger itself.

}
rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents)))
for _, event := range oEvents {
rt.out <- event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging?

time.Sleep(10 * time.Millisecond)
}
}()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert the above go routine is already running before we trySend the error ?

    * Routines will now use a priority queue instead of channels to
    iterate over events
var done = fmt.Errorf("done")

func simpleHandler(event Event) (Event, error) {
switch event.(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

singleCaseSwitch: should rewrite switch statement to if statement (from gocritic)

    + Simplify the design by demuxing events directly in the reactor
}

func schedulerHandle(event Event) (Event, error) {
switch event.(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

singleCaseSwitch: should rewrite switch statement to if statement (from gocritic)

}

func processorHandle(event Event) (Event, error) {
switch event.(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

singleCaseSwitch: should rewrite switch statement to if statement (from gocritic)

// XXX: check for backpressure
r.scheduler.trySend(event)
r.processor.trySend(event)
case _ = <-r.stopDemux:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S1005: '_ = <-ch' can be simplified to '<-ch' (from gosimple)

}
}

func (rt *Routine) setLogger(logger log.Logger) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

U1000: func (*Routine).setLogger is unused (from unused)

"github.com/Workiva/go-datastructures/queue"
)

type Event queue.Item
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event redeclared in this block (from typecheck)

if !rt.isRunning() {
return false
}
err := rt.queue.Put(event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot use event (variable of type Event) as queue.Item value in argument to rt.queue.Put: missing method Compare (from typecheck)

Copy link
Contributor

@ebuchman ebuchman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks sean. Merging for now, we'll keep iterating.

@brapse brapse merged commit abab490 into master Sep 18, 2019
@brapse brapse deleted the brapse/blockchain-v2-riri-routine branch September 18, 2019 20:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants