refactor(p2p)!: extract MConnection from Peer#4307
Conversation
5b8543e to
6a822f1
Compare
e33815e to
ba0c537
Compare
This comment was marked as outdated.
This comment was marked as outdated.
Contributes to #4302 **It's now the job of a `Transport` implementation to emit a multiplexed connection.** Previously, `MConnection` was created inside `Peer`, and `Transport` was responsible for returning a raw `net.Conn`. With the upcoming QUIC transport, this is no longer acceptable. Now, TCP transport returns a multiplexed connection (`MConnection`). QUIC transport, likewise, will be emitting multiplexed connection too. The difference is, of course, in case of TCP it's out-of-protocol multiplexing. Note I've made an internal "queue" in `MConnection`, which stores packets and the associated "channelID" (streamID). So, `MConnection` has a `readLoop`, which pushes incoming packets onto this queue (`map[streamID]chan []byte` actually). The packets then get read in `Peer#streamReadLoop` (1 per stream), which calls `Stream#Read`. So even though there's a single TCP connection, there's an abstraction of streams of data.
f1423e9 to
5fc3c2e
Compare
|
ready for review ✅ |
|
|
||
| mtx sync.RWMutex | ||
| // streamID -> list of incoming messages | ||
| recvMsgsByStreamID map[byte]chan []byte |
There was a problem hiding this comment.
error prone + "unwanted" queue => this is why I don't think it shouldn't be merged into main. The only way is to remove it and bring back old API like onReceive(...)
There was a problem hiding this comment.
I agree that onReceive is the best solution for the current architecture.
because it stores unknown msgs in queue now
cason
left a comment
There was a problem hiding this comment.
This is a super long PR.
I didn't review the p2p/transport package yet.
cason
left a comment
There was a problem hiding this comment.
Second round of reviewing.
Left other comments. There is a very odd e2e error, apparently a full node is getting a wrong validator set for some reason. Probably not related to the p2p refactoring.
I really think that this PR should be split in multiple ones, since we are addressing a lot of stuff here. Some for which we don't have much clarity (e.g., removing the onReceive callback versus multiple consuming routines + dropping messages).
I will approve to unblock the work. But we need to fully review the changes when the feature branch is ready to go.
|
|
||
| mtx sync.RWMutex | ||
| // streamID -> list of incoming messages | ||
| recvMsgsByStreamID map[byte]chan []byte |
There was a problem hiding this comment.
I agree that onReceive is the best solution for the current architecture.
| if desc, ok := desc.(StreamDescriptor); ok { | ||
| d = desc | ||
| } | ||
| c.channelsIdx[streamID] = newChannel(c, d) |
There was a problem hiding this comment.
You have to decide between stream and channel. I particularly will stay with channel.
There was a problem hiding this comment.
I would go with stream because the channel concept is not widely used (I wasn't able to find any references). The most popular lib for TCP multiplexing, https://github.com/hashicorp/yamux/blob/master/stream.go, also uses streams.
|
@cason thanks for the review 👏 |
Contributes to #4302
It's now the job of a
Transportimplementation to emit a multiplexed connection.Previously,
MConnectionwas created insidePeer, andTransportwas responsible for returning a rawnet.Conn. With the upcoming QUIC transport, this is no longer acceptable. Now, TCP transport returns a multiplexed connection (MConnection). QUIC transport, likewise, will be emitting multiplexed connection too. The difference is, of course, in case of TCP it's out-of-protocol multiplexing.Note I've made an internal "queue" in
MConnection, which stores packets and the associated "channelID" (streamID). So,MConnectionhas areadLoop, which pushes incoming packets onto this queue (map[streamID]chan []byteactually). The packets then get read inPeer#streamReadLoop(1 per stream), which callsStream#Read. So even though there's a single TCP connection, there's an abstraction of streams of data.New interfaces
Transport:
Connection:
Copilot Summary
This pull request introduces several changes to the
cometbftcodebase, focusing on refactoring the P2P (peer-to-peer) communication layer to use a newabstractpackage. The changes include updating import statements, modifying function signatures, and adding new types to theabstractpackage.Refactoring to use
abstractpackage:internal/blocksync/reactor.go: UpdatedStreamDescriptorsfunction to return[]abstract.StreamDescriptorinstead of[]p2p.StreamDescriptor.internal/consensus/reactor.go: ModifiedStreamDescriptorsfunction to useabstract.StreamDescriptor. [1] [2]internal/evidence/reactor.go: ChangedStreamDescriptorsfunction to return[]abstract.StreamDescriptor.mempool/reactor.go: UpdatedStreamDescriptorsfunction to return[]abstract.StreamDescriptor.node/setup.go: RefactoredcreateTransportfunction to useabstract.Transportandtcpconn.DefaultMConnConfig. [1] [2]Adding new types to
abstractpackage:p2p/abstract/connection.go: Added newConnectionandStreaminterfaces.p2p/abstract/transport.go: IntroducedTransportandStreamDescriptorinterfaces.Code improvements and fixes:
internal/blocksync/pool.go: Fixed logging format inbanPeerfunction.internal/blocksync/reactor_test.go: Added timeout mechanism toTestBadBlockStopsPeerto prevent infinite loops. [1] [2] [3]node/node.go: Set logger for transport inNewNodeWithCliParamsfunction.These changes aim to improve the modularity and maintainability of the P2P communication layer by abstracting common functionalities and enhancing the overall code structure.