Skip to content

refactor(p2p)!: extract MConnection from Peer#4307

Merged
melekes merged 23 commits intofeature/quicfrom
2128-quic-go-tcpconn
Nov 28, 2024
Merged

refactor(p2p)!: extract MConnection from Peer#4307
melekes merged 23 commits intofeature/quicfrom
2128-quic-go-tcpconn

Conversation

@melekes
Copy link
Collaborator

@melekes melekes commented Oct 19, 2024

Contributes to #4302

It's now the job of a Transport implementation to emit a multiplexed connection.

Previously, MConnection was created inside Peer, and Transport was responsible for returning a raw net.Conn. With the upcoming QUIC transport, this is no longer acceptable. Now, TCP transport returns a multiplexed connection (MConnection). QUIC transport, likewise, will be emitting multiplexed connection too. The difference is, of course, in case of TCP it's out-of-protocol multiplexing.

Note I've made an internal "queue" in MConnection, which stores packets and the associated "channelID" (streamID). So, MConnection has a readLoop, which pushes incoming packets onto this queue (map[streamID]chan []byte actually). The packets then get read in Peer#streamReadLoop (1 per stream), which calls Stream#Read. So even though there's a single TCP connection, there's an abstraction of streams of data.

p2p drawio

New interfaces

Transport:

// Transport connects the local node to the rest of the network.
type Transport interface {
	// NetAddr returns the network address of the local node.
	NetAddr() na.NetAddr

	// Accept waits for and returns the next connection to the local node.
	Accept() (Connection, *na.NetAddr, error)

	// Dial dials the given address and returns a connection.
	Dial(addr na.NetAddr) (Connection, error)
}

// StreamDescriptor describes a data stream. This could be a substream within a
// multiplexed TCP connection, QUIC stream, etc.
type StreamDescriptor interface {
	// StreamID returns the ID of the stream.
	StreamID() byte
	// MessageType returns the type of the message sent/received on this stream.
	MessageType() proto.Message
}

Connection:

// Connection is a multiplexed connection that can send and receive data
// on multiple streams.
type Connection interface {
	// OpenStream opens a new stream on the connection with an optional
	// description.
	OpenStream(streamID byte, desc any) (Stream, error)

	// LocalAddr returns the local network address, if known.
	LocalAddr() net.Addr

	// RemoteAddr returns the remote network address, if known.
	RemoteAddr() net.Addr

	// Close closes the connection.
	// If the protocol supports it, a reason will be sent to the remote.
	// Any blocked Read operations will be unblocked and return errors.
	Close(reason string) error

	// FlushAndClose flushes all the pending bytes and closes the connection.
	// If the protocol supports it, a reason will be sent to the remote.
	// Any blocked Read operations will be unblocked and return errors.
	FlushAndClose(reason string) error

	// ConnectionState returns basic details about the connection.
	// Warning: This API should not be considered stable and might change soon.
	ConnectionState() any

	// ErrorCh returns a channel that will receive errors from the connection.
	ErrorCh() <-chan error
}

// Stream is the interface implemented by QUIC streams or multiplexed TCP connection.
type Stream interface {
	ReceiveStream
	SendStream
	// SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both
	// SetReadDeadline and SetWriteDeadline.
	SetDeadline(t time.Time) error
}

// A ReceiveStream is a unidirectional Receive Stream.
type ReceiveStream interface {
	// Read reads data from the stream.
	// Read can be made to time out and return a net.Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetReadDeadline.
	// If the stream was canceled by the peer, the error is a StreamError and
	// Remote == true.
	// If the connection was closed due to a timeout, the error satisfies
	// the net.Error interface, and Timeout() will be true.
	io.Reader
	// SetReadDeadline sets the deadline for future Read calls and
	// any currently-blocked Read call.
	// A zero value for t means Read will not time out.
	SetReadDeadline(t time.Time) error
}

// A SendStream is a unidirectional Send Stream.
type SendStream interface {
	// Write writes data to the stream.
	// Write can be made to time out and return a net.Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetWriteDeadline.
	// If the stream was canceled by the peer, the error is a StreamError and
	// Remote == true.
	// If the connection was closed due to a timeout, the error satisfies
	// the net.Error interface, and Timeout() will be true.
	io.Writer
	// Close closes the write-direction of the stream.
	// Future calls to Write are not permitted after calling Close.
	// It must not be called concurrently with Write.
	// It must not be called after calling CancelWrite.
	io.Closer
	// SetWriteDeadline sets the deadline for future Write calls
	// and any currently-blocked Write call.
	// Even if write times out, it may return n > 0, indicating that
	// some data was successfully written.
	// A zero value for t means Write will not time out.
	SetWriteDeadline(t time.Time) error
}
Copilot Summary

This pull request introduces several changes to the cometbft codebase, focusing on refactoring the P2P (peer-to-peer) communication layer to use a new abstract package. The changes include updating import statements, modifying function signatures, and adding new types to the abstract package.

Refactoring to use abstract package:

Adding new types to abstract package:

Code improvements and fixes:

These changes aim to improve the modularity and maintainability of the P2P communication layer by abstracting common functionalities and enhancing the overall code structure.

@melekes melekes self-assigned this Oct 19, 2024
@melekes melekes added p2p hygiene Any work relating to code legibility/hygiene to make it easier to read breaking A breaking change labels Oct 19, 2024
@melekes melekes force-pushed the 2128-quic-go-nodeinfo branch from 5b8543e to 6a822f1 Compare October 24, 2024 07:37
Base automatically changed from 2128-quic-go-nodeinfo to main October 24, 2024 07:57
@melekes melekes force-pushed the 2128-quic-go-tcpconn branch from e33815e to ba0c537 Compare October 24, 2024 15:56
@melekes melekes added hygiene Any work relating to code legibility/hygiene to make it easier to read and removed hygiene Any work relating to code legibility/hygiene to make it easier to read labels Nov 11, 2024
@melekes melekes marked this pull request as ready for review November 14, 2024 12:44
@melekes melekes requested a review from a team as a code owner November 14, 2024 12:44
@melekes melekes requested a review from a team November 14, 2024 12:44
@melekes

This comment was marked as outdated.

Contributes to #4302

**It's now the job of a `Transport` implementation to emit a multiplexed
connection.**

Previously, `MConnection` was created inside `Peer`, and `Transport` was
responsible for returning a raw `net.Conn`. With the upcoming QUIC
transport, this is no longer acceptable. Now, TCP transport returns a
multiplexed connection (`MConnection`). QUIC transport, likewise, will
be emitting multiplexed connection too. The difference is, of course, in
case of TCP it's out-of-protocol multiplexing.

Note I've made an internal "queue" in `MConnection`, which stores
packets and the associated "channelID" (streamID). So, `MConnection` has
a `readLoop`, which pushes incoming packets onto this queue
(`map[streamID]chan []byte` actually). The packets then get read in
`Peer#streamReadLoop` (1 per stream), which calls `Stream#Read`. So even
though there's a single TCP connection, there's an abstraction of
streams of data.
@melekes melekes force-pushed the 2128-quic-go-tcpconn branch from f1423e9 to 5fc3c2e Compare November 20, 2024 14:06
@melekes melekes changed the base branch from main to feature/quic November 20, 2024 14:07
@melekes
Copy link
Collaborator Author

melekes commented Nov 20, 2024

ready for review ✅


mtx sync.RWMutex
// streamID -> list of incoming messages
recvMsgsByStreamID map[byte]chan []byte
Copy link
Collaborator Author

@melekes melekes Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error prone + "unwanted" queue => this is why I don't think it shouldn't be merged into main. The only way is to remove it and bring back old API like onReceive(...)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that onReceive is the best solution for the current architecture.

Copy link

@cason cason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a super long PR.

I didn't review the p2p/transport package yet.

Copy link

@cason cason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second round of reviewing.

Left other comments. There is a very odd e2e error, apparently a full node is getting a wrong validator set for some reason. Probably not related to the p2p refactoring.

I really think that this PR should be split in multiple ones, since we are addressing a lot of stuff here. Some for which we don't have much clarity (e.g., removing the onReceive callback versus multiple consuming routines + dropping messages).

I will approve to unblock the work. But we need to fully review the changes when the feature branch is ready to go.


mtx sync.RWMutex
// streamID -> list of incoming messages
recvMsgsByStreamID map[byte]chan []byte
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that onReceive is the best solution for the current architecture.

if desc, ok := desc.(StreamDescriptor); ok {
d = desc
}
c.channelsIdx[streamID] = newChannel(c, d)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to decide between stream and channel. I particularly will stay with channel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go with stream because the channel concept is not widely used (I wasn't able to find any references). The most popular lib for TCP multiplexing, https://github.com/hashicorp/yamux/blob/master/stream.go, also uses streams.

@melekes
Copy link
Collaborator Author

melekes commented Nov 27, 2024

@cason thanks for the review 👏

Copy link

@cason cason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the changes, nice work. :)

@melekes melekes merged commit 7fb0405 into feature/quic Nov 28, 2024
@melekes melekes deleted the 2128-quic-go-tcpconn branch November 28, 2024 08:13
@jmalicevic jmalicevic added this to the 2024-Q4 milestone Dec 3, 2024
github-merge-queue bot pushed a commit that referenced this pull request Dec 13, 2024
Closes #4553 

Refs #4307

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking A breaking change hygiene Any work relating to code legibility/hygiene to make it easier to read p2p

Projects

No open projects
Status: Done

Development

Successfully merging this pull request may close these issues.

3 participants