parent
c590cc02f4
commit
8c5c248e79
7 changed files with 100 additions and 46 deletions
|
@ -3,6 +3,8 @@ package network
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -18,12 +20,18 @@ type Discoverer interface {
|
||||||
PoolCount() int
|
PoolCount() int
|
||||||
RequestRemote(int)
|
RequestRemote(int)
|
||||||
RegisterBadAddr(string)
|
RegisterBadAddr(string)
|
||||||
RegisterGoodAddr(string)
|
RegisterGoodAddr(string, capability.Capabilities)
|
||||||
RegisterConnectedAddr(string)
|
RegisterConnectedAddr(string)
|
||||||
UnregisterConnectedAddr(string)
|
UnregisterConnectedAddr(string)
|
||||||
UnconnectedPeers() []string
|
UnconnectedPeers() []string
|
||||||
BadPeers() []string
|
BadPeers() []string
|
||||||
GoodPeers() []string
|
GoodPeers() []AddressWithCapabilities
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddressWithCapabilities represents node address with its capabilities
|
||||||
|
type AddressWithCapabilities struct {
|
||||||
|
Address string
|
||||||
|
Capabilities capability.Capabilities
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultDiscovery default implementation of the Discoverer interface.
|
// DefaultDiscovery default implementation of the Discoverer interface.
|
||||||
|
@ -34,7 +42,7 @@ type DefaultDiscovery struct {
|
||||||
dialTimeout time.Duration
|
dialTimeout time.Duration
|
||||||
badAddrs map[string]bool
|
badAddrs map[string]bool
|
||||||
connectedAddrs map[string]bool
|
connectedAddrs map[string]bool
|
||||||
goodAddrs map[string]bool
|
goodAddrs map[string]capability.Capabilities
|
||||||
unconnectedAddrs map[string]int
|
unconnectedAddrs map[string]int
|
||||||
isDead bool
|
isDead bool
|
||||||
requestCh chan int
|
requestCh chan int
|
||||||
|
@ -48,7 +56,7 @@ func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
|
||||||
dialTimeout: dt,
|
dialTimeout: dt,
|
||||||
badAddrs: make(map[string]bool),
|
badAddrs: make(map[string]bool),
|
||||||
connectedAddrs: make(map[string]bool),
|
connectedAddrs: make(map[string]bool),
|
||||||
goodAddrs: make(map[string]bool),
|
goodAddrs: make(map[string]capability.Capabilities),
|
||||||
unconnectedAddrs: make(map[string]int),
|
unconnectedAddrs: make(map[string]int),
|
||||||
requestCh: make(chan int),
|
requestCh: make(chan int),
|
||||||
pool: make(chan string, maxPoolSize),
|
pool: make(chan string, maxPoolSize),
|
||||||
|
@ -135,11 +143,14 @@ func (d *DefaultDiscovery) BadPeers() []string {
|
||||||
|
|
||||||
// GoodPeers returns all addresses of known good peers (that at least once
|
// GoodPeers returns all addresses of known good peers (that at least once
|
||||||
// succeeded handshaking with us).
|
// succeeded handshaking with us).
|
||||||
func (d *DefaultDiscovery) GoodPeers() []string {
|
func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
|
||||||
d.lock.RLock()
|
d.lock.RLock()
|
||||||
addrs := make([]string, 0, len(d.goodAddrs))
|
addrs := make([]AddressWithCapabilities, 0, len(d.goodAddrs))
|
||||||
for addr := range d.goodAddrs {
|
for addr, cap := range d.goodAddrs {
|
||||||
addrs = append(addrs, addr)
|
addrs = append(addrs, AddressWithCapabilities{
|
||||||
|
Address: addr,
|
||||||
|
Capabilities: cap,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
d.lock.RUnlock()
|
d.lock.RUnlock()
|
||||||
return addrs
|
return addrs
|
||||||
|
@ -147,9 +158,9 @@ func (d *DefaultDiscovery) GoodPeers() []string {
|
||||||
|
|
||||||
// RegisterGoodAddr registers good known connected address that passed
|
// RegisterGoodAddr registers good known connected address that passed
|
||||||
// handshake successfully.
|
// handshake successfully.
|
||||||
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
|
func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) {
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
d.goodAddrs[s] = true
|
d.goodAddrs[s] = c
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -79,9 +80,24 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
|
|
||||||
// Registered good addresses should end up in appropriate set.
|
// Registered good addresses should end up in appropriate set.
|
||||||
for _, addr := range set1 {
|
for _, addr := range set1 {
|
||||||
d.RegisterGoodAddr(addr)
|
d.RegisterGoodAddr(addr, capability.Capabilities{
|
||||||
|
{
|
||||||
|
Type: capability.FullNode,
|
||||||
|
Data: &capability.Node{StartHeight: 123},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
gAddrWithCap := d.GoodPeers()
|
||||||
|
gAddrs := make([]string, len(gAddrWithCap))
|
||||||
|
for i, addr := range gAddrWithCap {
|
||||||
|
require.Equal(t, capability.Capabilities{
|
||||||
|
{
|
||||||
|
Type: capability.FullNode,
|
||||||
|
Data: &capability.Node{StartHeight: 123},
|
||||||
|
},
|
||||||
|
}, addr.Capabilities)
|
||||||
|
gAddrs[i] = addr.Address
|
||||||
}
|
}
|
||||||
gAddrs := d.GoodPeers()
|
|
||||||
sort.Strings(gAddrs)
|
sort.Strings(gAddrs)
|
||||||
assert.Equal(t, 0, d.PoolCount())
|
assert.Equal(t, 0, d.PoolCount())
|
||||||
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm"
|
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||||
|
@ -151,17 +152,17 @@ func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
|
||||||
|
|
||||||
type testDiscovery struct{}
|
type testDiscovery struct{}
|
||||||
|
|
||||||
func (d testDiscovery) BackFill(addrs ...string) {}
|
func (d testDiscovery) BackFill(addrs ...string) {}
|
||||||
func (d testDiscovery) Close() {}
|
func (d testDiscovery) Close() {}
|
||||||
func (d testDiscovery) PoolCount() int { return 0 }
|
func (d testDiscovery) PoolCount() int { return 0 }
|
||||||
func (d testDiscovery) RegisterBadAddr(string) {}
|
func (d testDiscovery) RegisterBadAddr(string) {}
|
||||||
func (d testDiscovery) RegisterGoodAddr(string) {}
|
func (d testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {}
|
||||||
func (d testDiscovery) RegisterConnectedAddr(string) {}
|
func (d testDiscovery) RegisterConnectedAddr(string) {}
|
||||||
func (d testDiscovery) UnregisterConnectedAddr(string) {}
|
func (d testDiscovery) UnregisterConnectedAddr(string) {}
|
||||||
func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
|
func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
|
||||||
func (d testDiscovery) RequestRemote(n int) {}
|
func (d testDiscovery) RequestRemote(n int) {}
|
||||||
func (d testDiscovery) BadPeers() []string { return []string{} }
|
func (d testDiscovery) BadPeers() []string { return []string{} }
|
||||||
func (d testDiscovery) GoodPeers() []string { return []string{} }
|
func (d testDiscovery) GoodPeers() []AddressWithCapabilities { return []AddressWithCapabilities{} }
|
||||||
|
|
||||||
var defaultMessageHandler = func(t *testing.T, msg *Message) {}
|
var defaultMessageHandler = func(t *testing.T, msg *Message) {}
|
||||||
|
|
||||||
|
|
|
@ -1,27 +1,27 @@
|
||||||
package payload
|
package payload
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddressAndTime payload.
|
// AddressAndTime payload.
|
||||||
type AddressAndTime struct {
|
type AddressAndTime struct {
|
||||||
Timestamp uint32
|
Timestamp uint32
|
||||||
Services uint64
|
IP [16]byte
|
||||||
IP [16]byte
|
Capabilities capability.Capabilities
|
||||||
Port uint16
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAddressAndTime creates a new AddressAndTime object.
|
// NewAddressAndTime creates a new AddressAndTime object.
|
||||||
func NewAddressAndTime(e *net.TCPAddr, t time.Time) *AddressAndTime {
|
func NewAddressAndTime(e *net.TCPAddr, t time.Time, c capability.Capabilities) *AddressAndTime {
|
||||||
aat := AddressAndTime{
|
aat := AddressAndTime{
|
||||||
Timestamp: uint32(t.UTC().Unix()),
|
Timestamp: uint32(t.UTC().Unix()),
|
||||||
Services: 1,
|
Capabilities: c,
|
||||||
Port: uint16(e.Port),
|
|
||||||
}
|
}
|
||||||
copy(aat.IP[:], e.IP)
|
copy(aat.IP[:], e.IP)
|
||||||
return &aat
|
return &aat
|
||||||
|
@ -30,26 +30,34 @@ func NewAddressAndTime(e *net.TCPAddr, t time.Time) *AddressAndTime {
|
||||||
// DecodeBinary implements Serializable interface.
|
// DecodeBinary implements Serializable interface.
|
||||||
func (p *AddressAndTime) DecodeBinary(br *io.BinReader) {
|
func (p *AddressAndTime) DecodeBinary(br *io.BinReader) {
|
||||||
p.Timestamp = br.ReadU32LE()
|
p.Timestamp = br.ReadU32LE()
|
||||||
p.Services = br.ReadU64LE()
|
|
||||||
br.ReadBytes(p.IP[:])
|
br.ReadBytes(p.IP[:])
|
||||||
p.Port = br.ReadU16BE()
|
p.Capabilities.DecodeBinary(br)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeBinary implements Serializable interface.
|
// EncodeBinary implements Serializable interface.
|
||||||
func (p *AddressAndTime) EncodeBinary(bw *io.BinWriter) {
|
func (p *AddressAndTime) EncodeBinary(bw *io.BinWriter) {
|
||||||
bw.WriteU32LE(p.Timestamp)
|
bw.WriteU32LE(p.Timestamp)
|
||||||
bw.WriteU64LE(p.Services)
|
|
||||||
bw.WriteBytes(p.IP[:])
|
bw.WriteBytes(p.IP[:])
|
||||||
bw.WriteU16BE(p.Port)
|
p.Capabilities.EncodeBinary(bw)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IPPortString makes a string from IP and port specified.
|
// GetTCPAddress makes a string from IP and port specified in TCPCapability.
|
||||||
func (p *AddressAndTime) IPPortString() string {
|
// It returns an error if there's no such capability.
|
||||||
|
func (p *AddressAndTime) GetTCPAddress() (string, error) {
|
||||||
var netip = make(net.IP, 16)
|
var netip = make(net.IP, 16)
|
||||||
|
|
||||||
copy(netip, p.IP[:])
|
copy(netip, p.IP[:])
|
||||||
port := strconv.Itoa(int(p.Port))
|
port := -1
|
||||||
return netip.String() + ":" + port
|
for _, cap := range p.Capabilities {
|
||||||
|
if cap.Type == capability.TCPServer {
|
||||||
|
port = int(cap.Data.(*capability.Server).Port)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if port == -1 {
|
||||||
|
return "", errors.New("no TCP capability found")
|
||||||
|
}
|
||||||
|
return net.JoinHostPort(netip.String(), strconv.Itoa(port)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList is a list with AddrAndTime.
|
// AddressList is a list with AddrAndTime.
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/internal/testserdes"
|
"github.com/nspcc-dev/neo-go/pkg/internal/testserdes"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -14,14 +15,23 @@ func TestEncodeDecodeAddress(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
e, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:2000")
|
e, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:2000")
|
||||||
ts = time.Now()
|
ts = time.Now()
|
||||||
addr = NewAddressAndTime(e, ts)
|
addr = NewAddressAndTime(e, ts, capability.Capabilities{
|
||||||
|
{
|
||||||
|
Type: capability.TCPServer,
|
||||||
|
Data: &capability.Server{Port: uint16(e.Port)},
|
||||||
|
},
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
assert.Equal(t, ts.UTC().Unix(), int64(addr.Timestamp))
|
assert.Equal(t, ts.UTC().Unix(), int64(addr.Timestamp))
|
||||||
aatip := make(net.IP, 16)
|
aatip := make(net.IP, 16)
|
||||||
copy(aatip, addr.IP[:])
|
copy(aatip, addr.IP[:])
|
||||||
assert.Equal(t, e.IP, aatip)
|
assert.Equal(t, e.IP, aatip)
|
||||||
assert.Equal(t, e.Port, int(addr.Port))
|
assert.Equal(t, 1, len(addr.Capabilities))
|
||||||
|
assert.Equal(t, capability.Capability{
|
||||||
|
Type: capability.TCPServer,
|
||||||
|
Data: &capability.Server{Port: uint16(e.Port)},
|
||||||
|
}, addr.Capabilities[0])
|
||||||
|
|
||||||
testserdes.EncodeDecodeBinary(t, addr, new(AddressAndTime))
|
testserdes.EncodeDecodeBinary(t, addr, new(AddressAndTime))
|
||||||
}
|
}
|
||||||
|
@ -31,7 +41,12 @@ func TestEncodeDecodeAddressList(t *testing.T) {
|
||||||
addrList := NewAddressList(int(lenList))
|
addrList := NewAddressList(int(lenList))
|
||||||
for i := 0; i < int(lenList); i++ {
|
for i := 0; i < int(lenList); i++ {
|
||||||
e, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:200%d", i))
|
e, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:200%d", i))
|
||||||
addrList.Addrs[i] = NewAddressAndTime(e, time.Now())
|
addrList.Addrs[i] = NewAddressAndTime(e, time.Now(), capability.Capabilities{
|
||||||
|
{
|
||||||
|
Type: capability.TCPServer,
|
||||||
|
Data: &capability.Server{Port: 123},
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
testserdes.EncodeDecodeBinary(t, addrList, new(AddressList))
|
testserdes.EncodeDecodeBinary(t, addrList, new(AddressList))
|
||||||
|
|
|
@ -642,7 +642,10 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
|
||||||
// handleAddrCmd will process received addresses.
|
// handleAddrCmd will process received addresses.
|
||||||
func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error {
|
func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error {
|
||||||
for _, a := range addrs.Addrs {
|
for _, a := range addrs.Addrs {
|
||||||
s.discovery.BackFill(a.IPPortString())
|
addr, err := a.GetTCPAddress()
|
||||||
|
if err != nil {
|
||||||
|
s.discovery.BackFill(addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -657,8 +660,8 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
for i, addr := range addrs {
|
for i, addr := range addrs {
|
||||||
// we know it's a good address, so it can't fail
|
// we know it's a good address, so it can't fail
|
||||||
netaddr, _ := net.ResolveTCPAddr("tcp", addr)
|
netaddr, _ := net.ResolveTCPAddr("tcp", addr.Address)
|
||||||
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
|
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts, addr.Capabilities)
|
||||||
}
|
}
|
||||||
return p.EnqueueP2PMessage(NewMessage(CMDAddr, alist))
|
return p.EnqueueP2PMessage(NewMessage(CMDAddr, alist))
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,7 +234,7 @@ func (p *TCPPeer) StartProtocol() {
|
||||||
zap.Uint32("startHeight", p.lastBlockIndex),
|
zap.Uint32("startHeight", p.lastBlockIndex),
|
||||||
zap.Uint32("id", p.Version().Nonce))
|
zap.Uint32("id", p.Version().Nonce))
|
||||||
|
|
||||||
p.server.discovery.RegisterGoodAddr(p.PeerAddr().String())
|
p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities)
|
||||||
if p.server.chain.HeaderHeight() < p.LastBlockIndex() {
|
if p.server.chain.HeaderHeight() < p.LastBlockIndex() {
|
||||||
err = p.server.requestHeaders(p)
|
err = p.server.requestHeaders(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue