Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions p2p/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ func (e ErrTransportClosed) Error() string {
return "transport has been closed"
}

// ErrPeerRemoval is raised when attempting to remove a peer results in an error.
type ErrPeerRemoval struct{}

func (e ErrPeerRemoval) Error() string {
return "peer removal failed"
}

//-------------------------------------------------------------------

type ErrNetAddressNoID struct {
Expand Down
2 changes: 2 additions & 0 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ func (mp *Peer) RemoteIP() net.IP { return mp.ip }
func (mp *Peer) SocketAddr() *p2p.NetAddress { return mp.addr }
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *Peer) CloseConn() error { return nil }
func (mp *Peer) SetRemovalFailed() {}
func (mp *Peer) GetRemovalFailed() bool { return false }
19 changes: 19 additions & 0 deletions p2p/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Peer interface {

Set(string, interface{})
Get(string) interface{}

SetRemovalFailed()
GetRemovalFailed() bool
}

//----------------------------------------------------------
Expand Down Expand Up @@ -117,6 +120,9 @@ type peer struct {

metrics *Metrics
metricsTicker *time.Ticker

// When removal of a peer fails, we set this flag
removalAttemptFailed bool
}

type PeerOption func(*peer)
Expand Down Expand Up @@ -316,6 +322,14 @@ func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
}

func (p *peer) SetRemovalFailed() {
p.removalAttemptFailed = true
}

func (p *peer) GetRemovalFailed() bool {
return p.removalAttemptFailed
}

//---------------------------------------------------
// methods only used for testing
// TODO: can we remove these?
Expand Down
9 changes: 9 additions & 0 deletions p2p/peer_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (ps *PeerSet) Add(peer Peer) error {
if ps.lookup[peer.ID()] != nil {
return ErrSwitchDuplicatePeerID{peer.ID()}
}
if peer.GetRemovalFailed() {
return ErrPeerRemoval{}
}
Comment on lines +50 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this check before calling PeerSet.Add here. I would prefer to do it on line 813 of switch.go before initializing the peer to the individual reactors

Copy link
Contributor

@thanethomson thanethomson Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will reordering this as you've recommended @cmwaters not change the effectiveness of the fix?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by effectiveness? As in it will no longer fix it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this check happens anywhere earlier we introduce the exact same race condition we already heaving.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can absolutely do it before and if this flag happens to be set we don't do anything but we also do have to do it here as well to be sure we are not in the same position as w/o the fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this check happens anywhere earlier we introduce the exact same race condition we already heaving.

I see. I wasn't aware of that. Need to look in more closely then


index := len(ps.list)
// Appending is safe even with other goroutines
Expand Down Expand Up @@ -107,6 +110,12 @@ func (ps *PeerSet) Remove(peer Peer) bool {

item := ps.lookup[peer.ID()]
if item == nil {
// Removing the peer has failed so we set a flag to mark that a removal was attempted.
// This can happen when the peer add routine from the switch is running in
// parallel to the receive routine of MConn.
// There is an error within MConn but the switch has not actually added the peer to the peer set yet.
// Setting this flag will prevent a peer from being added to a node's peer set afterwards.
peer.SetRemovalFailed()
return false
}

Expand Down
2 changes: 2 additions & 0 deletions p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) SetRemovalFailed() {}
func (mp *mockPeer) GetRemovalFailed() bool { return false }

// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {
Expand Down
10 changes: 10 additions & 0 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
// https://github.com/tendermint/tendermint/issues/3338
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
} else {
// Removal of the peer has failed. The function above sets a flag within the peer to mark this.
// We keep this message here as information to the developer.
sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID())
}
}

Expand Down Expand Up @@ -824,6 +828,12 @@ func (sw *Switch) addPeer(p Peer) error {
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
switch err.(type) {
case ErrPeerRemoval:
sw.Logger.Error("Error starting peer ",
" err ", "Peer has already errored and removal was attempted.",
"peer", p.ID())
}
return err
}
sw.metrics.Peers.Add(float64(1))
Expand Down
13 changes: 13 additions & 0 deletions p2p/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,3 +836,16 @@ func BenchmarkSwitchBroadcast(b *testing.B) {

b.Logf("success: %v, failure: %v", numSuccess, numFailure)
}

func TestSwitchRemovalErr(t *testing.T) {

sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
return initSwitchFunc(i, sw)
})
assert.Equal(t, len(sw1.Peers().List()), 1)
p := sw1.Peers().List()[0]

sw2.StopPeerForError(p, fmt.Errorf("peer should error"))

assert.Equal(t, sw2.peers.Add(p).Error(), ErrPeerRemoval{}.Error())
}