Conversation
Codecov Report
@@ Coverage Diff @@
## master #5831 +/- ##
==========================================
+ Coverage 59.91% 59.96% +0.05%
==========================================
Files 267 269 +2
Lines 24094 24495 +401
==========================================
+ Hits 14435 14688 +253
- Misses 8124 8226 +102
- Partials 1535 1581 +46
|
alexanderbez
left a comment
There was a problem hiding this comment.
Providing a preliminary review -- looks great! Looking at the Router implementation now.
alexanderbez
left a comment
There was a problem hiding this comment.
Performed another review (solely the router) and it looks great -- easy to follow for the most part. I think it is just a tad bit confusing where/who closes queues (i.e. it seems we close and/or remove from map in both acceptPeers and routeChannel).
Overall, LGTM.
| return nil | ||
| } | ||
|
|
||
| // Claim claims a peer. The caller has exclusive ownership of the peer, and must |
There was a problem hiding this comment.
Technically, multiple processes can have access to a storePeer, correct? The caller can pass that object around if it wanted to, but my guess is we don't really need to worry about that?
There was a problem hiding this comment.
Well, that's up to the caller -- it owns it and can do whatever it wants with it. But the store will not give it to any other callers until it has been returned.
| case queue.enqueue() <- Envelope{channel: ChannelID(chID), From: peerID, Message: msg}: | ||
| r.logger.Debug("received message", "peer", peerID, "message", msg) | ||
| case <-queue.closed(): | ||
| r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID) |
There was a problem hiding this comment.
| r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID) | |
| r.logger.Error("channel closed; dropping message", "peer", peerID, "channel", chID) |
57f2058 to
dc6b31d
Compare
b3231b2 to
6711489
Compare
6711489 to
202790a
Compare
|
TFTR @alexanderbez! |
| ctx := context.Background() | ||
|
|
||
| for _, address := range peer.Addresses { | ||
| resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
There was a problem hiding this comment.
should the timeout be configurable?
There was a problem hiding this comment.
Probably, eventually. This is just a prototype, we'll be adding/integrating configurability later.
| err := <-resultsCh | ||
| _ = conn.Close() | ||
| sendQueue.close() | ||
| if e := <-resultsCh; err == nil { |
There was a problem hiding this comment.
golang.org/x/sync/errgroup ?
There was a problem hiding this comment.
Nice, wasn't aware of errgroup. However, in this case we're using conn.Close() and sendQueue.close() to signal termination of the other goroutines after one of them returns, unlike errgroup which relies to context for signaling. I think introducing contexts in addition to connections and queues here just complicates things.
Early but functional prototype of the new
p2p.Router, see its GoDoc comment for details on how it works. Expect much of this logic to change and improve as we evolve the new P2P stack.There is a simple test that sets up an in-memory network of four routers with reactors and passes messages between them, but otherwise no exhaustive tests since this is very much a work-in-progress.