Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (runner *agentRunner) LoadAgent(options Options) error {
Version: fmt.Sprintf("vipnode/agent/%s", Version),
UpdateInterval: updateInterval,
NumHosts: options.Agent.MinPeers,
StrictPeers: options.Agent.StrictPeers,
}
runner.Agent = a
if options.Agent.NodeURI != "" {
Expand Down
52 changes: 43 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ type Agent struct {
// then store.KeepaliveInterval is used.
UpdateInterval time.Duration

// StrictPeers will disconnect any peers that don't match the active peer
// set that is returned by the pool during updates. This includes peers
// whose IP address does not match what the pool returned.
// StrictPeers is used for maintaining a specific peering set and
// overriding any other peering side effects like the node's built-in
// discovery.
StrictPeers bool

initOnce sync.Once
mu sync.Mutex
started bool
Expand Down Expand Up @@ -228,19 +236,32 @@ func (a *Agent) UpdatePeers(ctx context.Context, p pool.Pool) error {

logger.Printf("Pool update: peers=%d active=%d invalid=%d block=%d balance=%s", len(peers), len(update.ActivePeers), len(update.InvalidPeers), blockNumber, balance.String())

// Do we need more peers?
if needMore := a.NumHosts - len(update.ActivePeers); needMore > 0 {
if err := a.AddPeers(ctx, p, needMore); err == ErrNoPeers {
// We can live without more peers for now, will try again next time
} else if err != nil {
return err
if a.StrictPeers {
lookup := make(map[string]struct{}, len(update.ActivePeers))
for _, peerID := range update.ActivePeers {
lookup[peerID] = struct{}{}
}
}

if a.BlockNumberCallback != nil {
a.BlockNumberCallback(blockNumber, update.LatestBlockNumber)
// Mark any non-active peers as invalid. These should be a superset of
// the original update.InvalidPeers, so we truncate it first.
update.InvalidPeers = update.InvalidPeers[:0]
for _, p := range peers {
if _, ok := lookup[p.EnodeID()]; ok {
// FIXME: If we can get the ActivePeers' intended URI, we can
// compare the remote host address as well. Right now we just
// compare the EnodeID.

continue // Local peer matching active peer on pool
}
update.InvalidPeers = append(update.InvalidPeers, p.EnodeID())
}

if len(update.InvalidPeers) > 0 {
logger.Printf("Disconnecting from mismatched StrictPeers: %d", len(update.InvalidPeers))
}
}

// Disconnect from invalid peers
var errors []error
for _, peerID := range update.InvalidPeers {
if err := a.EthNode.RemoveTrustedPeer(ctx, peerID); err != nil {
Expand All @@ -251,6 +272,19 @@ func (a *Agent) UpdatePeers(ctx context.Context, p pool.Pool) error {
}
}

// Do we need more peers?
if needMore := a.NumHosts - len(update.ActivePeers); needMore > 0 {
if err := a.AddPeers(ctx, p, needMore); err == ErrNoPeers {
// We can live without more peers for now, will try again next time
} else if err != nil {
return err
}
}

if a.BlockNumberCallback != nil {
a.BlockNumberCallback(blockNumber, update.LatestBlockNumber)
}

if len(errors) > 0 {
return fmt.Errorf("failed to disconnect from invalid peers: %q", errors)
}
Expand Down
91 changes: 90 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package agent
import (
"context"
"os"
"reflect"
"testing"

"github.com/vipnode/vipnode/ethnode"
"github.com/vipnode/vipnode/internal/fakenode"
"github.com/vipnode/vipnode/pool"
"github.com/vipnode/vipnode/pool/store"
Expand All @@ -24,6 +26,7 @@ func TestAgent(t *testing.T) {
if err := agent.Start(p); err != nil {
t.Fatal(err)
}
defer agent.Stop()

if peers, err := agent.EthNode.Peers(context.Background()); err != nil {
t.Fatal(err)
Expand All @@ -32,7 +35,7 @@ func TestAgent(t *testing.T) {
}

p.Nodes = append(p.Nodes, store.Node{
URI: "foo",
URI: "enode://bar@127.0.0.1:30303",
})

// Force update
Expand All @@ -46,3 +49,89 @@ func TestAgent(t *testing.T) {
t.Errorf("wrong number of peers: got %d; want %d", got, want)
}
}

func TestAgentStrictPeers(t *testing.T) {
SetLogger(os.Stderr)

node := fakenode.Node("foo")
agent := Agent{
EthNode: node,
NumHosts: 5,
StrictPeers: true,
}

fakepeers := fakenode.FakePeers(15)

// Prefill peers
node.FakePeers = fakepeers[:10]

// Confirm peers
if peers, err := agent.EthNode.Peers(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(peers), 10; got != want {
t.Errorf("wrong number of peers: got %d; want %d", got, want)
}

p := &pool.StaticPool{}
p.Nodes = append(
p.Nodes,
// New node
store.Node{URI: "enode://bar@127.0.0.1:30303"},
// Include a subset of the original
store.Node{URI: fakepeers[4].EnodeURI()},
store.Node{URI: fakepeers[5].EnodeURI()},
// Mismatch host
store.Node{URI: "enode://" + fakepeers[6].EnodeID() + "@42.42.42.42:30303"},
)

expectURIs := make([]string, 0, len(p.Nodes))
for _, n := range p.Nodes {
expectURIs = append(expectURIs, n.URI)
}

if err := agent.Start(p); err != nil {
t.Fatal(err)
}
defer agent.Stop()

// Force update
if err := agent.UpdatePeers(context.Background(), p); err != nil {
t.Fatal(err)
}

// Set should match the pool set
if peers, err := agent.EthNode.Peers(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(peers), len(expectURIs); got != want {
t.Errorf("wrong number of peers: got %d; want %d", got, want)
} else if got, want := ethnode.Peers(peers).URIs(), expectURIs; !reflect.DeepEqual(got, want) {
t.Errorf("mismatched enode URIs:\n got: %s\nwant: %s", got, want)
}

// One more time, add more peers
if err := node.ConnectPeer(context.Background(), fakepeers[11].EnodeURI()); err != nil {
t.Fatal(err)
}
if err := node.ConnectPeer(context.Background(), fakepeers[12].EnodeURI()); err != nil {
t.Fatal(err)
}
if peers, err := agent.EthNode.Peers(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(peers), len(expectURIs)+2; got != want {
t.Errorf("wrong number of peers: got %d; want %d", got, want)
}

// Force update
if err := agent.UpdatePeers(context.Background(), p); err != nil {
t.Fatal(err)
}

// Set should match the pool set
if peers, err := agent.EthNode.Peers(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(peers), len(expectURIs); got != want {
t.Errorf("wrong number of peers: got %d; want %d", got, want)
} else if got, want := ethnode.Peers(peers).URIs(), expectURIs; !reflect.DeepEqual(got, want) {
t.Errorf("mismatched enode URIs:\n got: %s\nwant: %s", got, want)
}
}
15 changes: 15 additions & 0 deletions ethnode/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,19 @@ type PeerInfo struct {
} `json:"network"`
}

// EnodeID returns the encoded Ethereum Node ID (public key of the node)
func (p *PeerInfo) EnodeID() string {
if len(p.Enode) <= 8+128 { // "enode://{128 ascii chars}@..."
return p.ID
}
return p.Enode[8 : 8+128]
}

// EnodeURI returns the full enode connection string: "enode://{128 ascii chars}@{remote address}:{port}"
func (p *PeerInfo) EnodeURI() string {
return "enode://" + p.EnodeID() + "@" + p.Network.RemoteAddress
}

func (p *PeerInfo) IsFullNode() bool {
if p.Protocols == nil {
return false
Expand All @@ -231,6 +237,15 @@ func (peers Peers) IDs() []string {
return r
}

// URIs returns a list of connection EnodeURIs for the peers.
func (peers Peers) URIs() []string {
r := make([]string, 0, len(peers))
for _, peer := range peers {
r = append(r, peer.EnodeURI())
}
return r
}

// EthNode is the normalized interface between different kinds of nodes.
type EthNode interface {
NodeRPC() *rpc.Client
Expand Down
19 changes: 15 additions & 4 deletions internal/fakenode/fakenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,26 @@ func (n *FakeNode) ConnectPeer(ctx context.Context, nodeURI string) error {
if err != nil {
return err
}
n.FakePeers = append(n.FakePeers, ethnode.PeerInfo{
p := ethnode.PeerInfo{
ID: uri.User.Username(),
Caps: []string{"fake/1", "eth/62", "eth/63", "les/2"},
Protocols: map[string]json.RawMessage{
"fake": json.RawMessage("{}"),
},
})
}
p.Network.RemoteAddress = uri.Host
n.FakePeers = append(n.FakePeers, p)
return nil
}
func (n *FakeNode) DisconnectPeer(ctx context.Context, nodeID string) error {
n.Calls = append(n.Calls, Call("DisconnectPeer", nodeID))
for i, p := range n.FakePeers {
if p.EnodeID() == nodeID {
// Found, remove
n.FakePeers = append(n.FakePeers[:i], n.FakePeers[i+1:]...)
}
}
// Not found, nothing to do
return nil
}
func (n *FakeNode) Peers(ctx context.Context) ([]ethnode.PeerInfo, error) {
Expand All @@ -110,9 +119,11 @@ func (n *FakeNode) BlockNumber(ctx context.Context) (uint64, error) {
func FakePeers(num int) []ethnode.PeerInfo {
peers := make([]ethnode.PeerInfo, 0, num)
for i := 0; i < num; i++ {
peers = append(peers, ethnode.PeerInfo{
p := ethnode.PeerInfo{
ID: fmt.Sprintf("%0128x", i),
})
}
p.Network.RemoteAddress = fmt.Sprintf("127.0.1.%d:30303", i)
peers = append(peers, p)
}
return peers
}
22 changes: 20 additions & 2 deletions internal/fakenode/fakenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,34 @@ import (

func TestFakeNode(t *testing.T) {
n := Node("foo")
n.ConnectPeer(context.Background(), "abc")
if err := n.ConnectPeer(context.Background(), "enode://abc@127.0.0.1"); err != nil {
t.Fatal(err)
}

if len(n.Calls) != 1 {
t.Errorf("wrong number of calls: %d", len(n.Calls))
}

expected := Calls{
Call("ConnectPeer", "abc"),
Call("ConnectPeer", "enode://abc@127.0.0.1"),
}
if !reflect.DeepEqual(n.Calls, expected) {
t.Errorf("got: %s; want: %s", n.Calls, expected)
}

if peers, err := n.Peers(context.Background()); err != nil {
t.Fatal(err)
} else if len(peers) != 1 {
t.Errorf("wrong number of peers: %s", peers)
}

if err := n.DisconnectPeer(context.Background(), "abc"); err != nil {
t.Fatal(err)
}

if peers, err := n.Peers(context.Background()); err != nil {
t.Fatal(err)
} else if len(peers) != 0 {
t.Errorf("wrong number of peers: %s", peers)
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Options struct {
NodeHost string `long:"enode.host" description:"Override just the host component of reported enode:// URI. Useful for overriding network routing."`
Payout string `long:"payout" description:"Ethereum wallet address to associate pool credits."`
MinPeers int `long:"min-peers" description:"Minimum number of peers to maintain." default:"3"`
StrictPeers bool `long:"strict-peers" description:"Disconnect peers that were not provided by the pool."`
UpdateInterval string `long:"update-interval" description:"Time between updates sent to pool, should be under 120s." default:"60s"`
} `command:"agent" description:"Connect as a node to a pool or another vipnode."`

Expand Down