Fixed some networking issues (#68)

* Faster persist timer

* fixed networking issues.
This commit is contained in:
Anthony De Meulemeester 2018-04-13 12:14:08 +02:00 committed by GitHub
parent 4bd5b2812e
commit ab2568cc51
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 69 additions and 206 deletions

View file

@ -19,7 +19,7 @@ push-tag:
git push origin ${VERSION} git push origin ${VERSION}
run: build run: build
./bin/neo-go node -config-path ./config -${NETMODE} --debug ./bin/neo-go node -config-path ./config -${NETMODE}
test: test:
@go test ./... -cover @go test ./... -cover

View file

@ -1 +1 @@
0.40.0 0.40.1

View file

@ -24,7 +24,7 @@ const (
var ( var (
genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
decrementInterval = 2000000 decrementInterval = 2000000
persistInterval = 5 * time.Second persistInterval = 1 * time.Second
) )
// Blockchain represents the blockchain. // Blockchain represents the blockchain.
@ -164,13 +164,6 @@ func (bc *Blockchain) run() {
} }
} }
// For now this will return a hardcoded hash of the NEO governing token.
func (bc *Blockchain) governingToken() util.Uint256 {
neoNativeAsset := "c56f33fc6ecfcd0c225c4ab356fee59390af8560be0e930faebe74a6daff7c9b"
val, _ := util.Uint256DecodeString(neoNativeAsset)
return val
}
// AddBlock processes the given block and will add it to the cache so it // AddBlock processes the given block and will add it to the cache so it
// can be persisted. // can be persisted.
func (bc *Blockchain) AddBlock(block *Block) error { func (bc *Blockchain) AddBlock(block *Block) error {
@ -299,10 +292,6 @@ func (bc *Blockchain) persistBlock(block *Block) error {
} else { } else {
account.Balances[output.AssetID] = output.Amount account.Balances[output.AssetID] = output.Amount
} }
if output.AssetID.Equals(bc.governingToken()) && len(account.Votes) > 0 {
// TODO
}
} }
// Process TX inputs that are grouped by previous hash. // Process TX inputs that are grouped by previous hash.
@ -324,10 +313,6 @@ func (bc *Blockchain) persistBlock(block *Block) error {
return err return err
} }
if prevTXOutput.AssetID.Equals(bc.governingToken()) {
// TODO
}
account.Balances[prevTXOutput.AssetID] -= prevTXOutput.Amount account.Balances[prevTXOutput.AssetID] -= prevTXOutput.Amount
} }
} }
@ -367,6 +352,10 @@ func (bc *Blockchain) persist() (err error) {
lenCache = bc.blockCache.Len() lenCache = bc.blockCache.Len()
) )
if lenCache == 0 {
return nil
}
bc.headersOp <- func(headerList *HeaderHashList) { bc.headersOp <- func(headerList *HeaderHashList) {
for i := 0; i < lenCache; i++ { for i := 0; i < lenCache; i++ {
if uint32(headerList.Len()) <= bc.BlockHeight() { if uint32(headerList.Len()) <= bc.BlockHeight() {

View file

@ -53,10 +53,6 @@ func (d testDiscovery) BadPeers() []string { return []string{} }
type localTransport struct{} type localTransport struct{}
func (t localTransport) Consumer() <-chan protoTuple {
ch := make(chan protoTuple)
return ch
}
func (t localTransport) Dial(addr string, timeout time.Duration) error { func (t localTransport) Dial(addr string, timeout time.Duration) error {
return nil return nil
} }
@ -85,8 +81,9 @@ func (p *localPeer) Endpoint() util.Endpoint {
return p.endpoint return p.endpoint
} }
func (p *localPeer) Disconnect(err error) {} func (p *localPeer) Disconnect(err error) {}
func (p *localPeer) Send(msg *Message) { func (p *localPeer) WriteMsg(msg *Message) error {
p.messageHandler(p.t, msg) p.messageHandler(p.t, msg)
return nil
} }
func (p *localPeer) Done() chan error { func (p *localPeer) Done() chan error {
done := make(chan error) done := make(chan error)
@ -95,6 +92,9 @@ func (p *localPeer) Done() chan error {
func (p *localPeer) Version() *payload.Version { func (p *localPeer) Version() *payload.Version {
return p.version return p.version
} }
func (p *localPeer) SetVersion(v *payload.Version) {
p.version = v
}
func newTestServer() *Server { func newTestServer() *Server {
return &Server{ return &Server{

View file

@ -8,7 +8,8 @@ import (
type Peer interface { type Peer interface {
Endpoint() util.Endpoint Endpoint() util.Endpoint
Disconnect(error) Disconnect(error)
Send(msg *Message) WriteMsg(msg *Message) error
Done() chan error Done() chan error
Version() *payload.Version Version() *payload.Version
SetVersion(*payload.Version)
} }

View file

@ -47,13 +47,6 @@ type (
register chan Peer register chan Peer
unregister chan peerDrop unregister chan peerDrop
quit chan struct{} quit chan struct{}
proto <-chan protoTuple
}
protoTuple struct {
msg *Message
peer Peer
} }
peerDrop struct { peerDrop struct {
@ -75,7 +68,6 @@ func NewServer(config ServerConfig, chain *core.Blockchain) *Server {
} }
s.transport = NewTCPTransport(s, fmt.Sprintf(":%d", config.ListenTCP)) s.transport = NewTCPTransport(s, fmt.Sprintf(":%d", config.ListenTCP))
s.proto = s.transport.Consumer()
s.discovery = NewDefaultDiscovery( s.discovery = NewDefaultDiscovery(
s.DialTimeout, s.DialTimeout,
s.transport, s.transport,
@ -96,8 +88,14 @@ func (s *Server) Start(errChan chan error) {
"headerHeight": s.chain.HeaderHeight(), "headerHeight": s.chain.HeaderHeight(),
}).Info("node started") }).Info("node started")
for _, addr := range s.Seeds {
if err := s.transport.Dial(addr, s.DialTimeout); err != nil {
log.Warnf("failed to connect to remote node %s", addr)
continue
}
}
go s.transport.Accept() go s.transport.Accept()
s.discovery.BackFill(s.Seeds...)
s.run() s.run()
} }
@ -112,34 +110,17 @@ func (s *Server) Shutdown() {
// UnconnectedPeers returns a list of peers that are in the discovery peer list // UnconnectedPeers returns a list of peers that are in the discovery peer list
// but are not connected to the server. // but are not connected to the server.
func (s *Server) UnconnectedPeers() []string { func (s *Server) UnconnectedPeers() []string {
return s.discovery.UnconnectedPeers() return []string{}
} }
// BadPeers returns a list of peers the are flagged as "bad" peers. // BadPeers returns a list of peers the are flagged as "bad" peers.
func (s *Server) BadPeers() []string { func (s *Server) BadPeers() []string {
return s.discovery.BadPeers() return []string{}
} }
func (s *Server) run() { func (s *Server) run() {
// Ask discovery to connect with remote nodes to fill up
// the server minimum peer slots.
s.discovery.RequestRemote(minPeers - s.PeerCount())
for { for {
select { select {
case proto := <-s.proto:
if err := s.processProto(proto); err != nil {
proto.peer.Disconnect(err)
// verack and version implies that the protocol is
// not started and the only way to disconnect them
// from the server is to manually call unregister.
switch proto.msg.CommandType() {
case CMDVerack, CMDVersion:
go func() {
s.unregister <- peerDrop{proto.peer, err}
}()
}
}
case <-s.quit: case <-s.quit:
s.transport.Close() s.transport.Close()
for p := range s.peers { for p := range s.peers {
@ -154,7 +135,6 @@ func (s *Server) run() {
"endpoint": p.Endpoint(), "endpoint": p.Endpoint(),
}).Info("new peer connected") }).Info("new peer connected")
case drop := <-s.unregister: case drop := <-s.unregister:
s.discovery.RegisterBadAddr(drop.peer.Endpoint().String())
delete(s.peers, drop.peer) delete(s.peers, drop.peer)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"endpoint": drop.peer.Endpoint(), "endpoint": drop.peer.Endpoint(),
@ -189,7 +169,6 @@ func (s *Server) startProtocol(p Peer) {
}).Info("started protocol") }).Info("started protocol")
s.requestHeaders(p) s.requestHeaders(p)
s.requestPeerInfo(p)
timer := time.NewTimer(s.ProtoTickInterval) timer := time.NewTimer(s.ProtoTickInterval)
for { for {
@ -202,18 +181,13 @@ func (s *Server) startProtocol(p Peer) {
if p.Version().StartHeight > s.chain.BlockHeight() { if p.Version().StartHeight > s.chain.BlockHeight() {
s.requestBlocks(p) s.requestBlocks(p)
} }
// If the discovery does not have a healthy address pool
// we will ask for a new batch of addresses.
if s.discovery.PoolCount() < minPoolCount {
s.requestPeerInfo(p)
}
timer.Reset(s.ProtoTickInterval) timer.Reset(s.ProtoTickInterval)
} }
} }
} }
// When a peer connects to the server, we will send our version immediately. // When a peer connects to the server, we will send our version immediately.
func (s *Server) sendVersion(p Peer) { func (s *Server) sendVersion(p Peer) error {
payload := payload.NewVersion( payload := payload.NewVersion(
s.id, s.id,
s.ListenTCP, s.ListenTCP,
@ -221,7 +195,7 @@ func (s *Server) sendVersion(p Peer) {
s.chain.BlockHeight(), s.chain.BlockHeight(),
s.Relay, s.Relay,
) )
p.Send(NewMessage(s.Net, CMDVersion, payload)) return p.WriteMsg(NewMessage(s.Net, CMDVersion, payload))
} }
// When a peer sends out his version we reply with verack after validating // When a peer sends out his version we reply with verack after validating
@ -233,8 +207,8 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
if s.id == version.Nonce { if s.id == version.Nonce {
return errIdenticalID return errIdenticalID
} }
p.Send(NewMessage(s.Net, CMDVerack, nil)) p.SetVersion(version)
return nil return p.WriteMsg(NewMessage(s.Net, CMDVerack, nil))
} }
// handleHeadersCmd will process the headers it received from its peer. // handleHeadersCmd will process the headers it received from its peer.
@ -268,12 +242,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
return errInvalidInvType return errInvalidInvType
} }
payload := payload.NewInventory(inv.Type, inv.Hashes) payload := payload.NewInventory(inv.Type, inv.Hashes)
p.Send(NewMessage(s.Net, CMDGetData, payload)) p.WriteMsg(NewMessage(s.Net, CMDGetData, payload))
return nil
}
func (s *Server) handleGetHeadersCmd(p Peer, getHeaders *payload.GetBlocks) error {
log.Info(getHeaders)
return nil return nil
} }
@ -282,13 +251,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, getHeaders *payload.GetBlocks) erro
func (s *Server) requestHeaders(p Peer) { func (s *Server) requestHeaders(p Peer) {
start := []util.Uint256{s.chain.CurrentHeaderHash()} start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{}) payload := payload.NewGetBlocks(start, util.Uint256{})
p.Send(NewMessage(s.Net, CMDGetHeaders, payload)) p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload))
}
// requestPeerInfo will send a getaddr message to the peer
// which will respond with his known addresses in the network.
func (s *Server) requestPeerInfo(p Peer) {
p.Send(NewMessage(s.Net, CMDGetAddr, nil))
} }
// requestBlocks will send a getdata message to the peer // requestBlocks will send a getdata message to the peer
@ -307,19 +270,14 @@ func (s *Server) requestBlocks(p Peer) {
} }
if len(hashes) > 0 { if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes) payload := payload.NewInventory(payload.BlockType, hashes)
p.Send(NewMessage(s.Net, CMDGetData, payload)) p.WriteMsg(NewMessage(s.Net, CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.Version().StartHeight { } else if s.chain.HeaderHeight() < p.Version().StartHeight {
s.requestHeaders(p) s.requestHeaders(p)
} }
} }
// process the received protocol message. // handleMessage will process the given message.
func (s *Server) processProto(proto protoTuple) error { func (s *Server) handleMessage(peer Peer, msg *Message) error {
var (
peer = proto.peer
msg = proto.msg
)
// Make sure both server and peer are operating on // Make sure both server and peer are operating on
// the same network. // the same network.
if msg.Magic != s.Net { if msg.Magic != s.Net {
@ -339,9 +297,6 @@ func (s *Server) processProto(proto protoTuple) error {
case CMDBlock: case CMDBlock:
block := msg.Payload.(*core.Block) block := msg.Payload.(*core.Block)
return s.handleBlockCmd(peer, block) return s.handleBlockCmd(peer, block)
case CMDGetHeaders:
getHeaders := msg.Payload.(*payload.GetBlocks)
s.handleGetHeadersCmd(peer, getHeaders)
case CMDVerack: case CMDVerack:
// Make sure this peer has send his version before we start the // Make sure this peer has send his version before we start the
// protocol with that peer. // protocol with that peer.
@ -349,11 +304,6 @@ func (s *Server) processProto(proto protoTuple) error {
return errInvalidHandshake return errInvalidHandshake
} }
go s.startProtocol(peer) go s.startProtocol(peer)
case CMDAddr:
addressList := msg.Payload.(*payload.AddressList)
for _, addr := range addressList.Addrs {
s.discovery.BackFill(addr.Endpoint.String())
}
} }
return nil return nil
} }

View file

@ -31,19 +31,6 @@ func TestSendVersion(t *testing.T) {
s.sendVersion(p) s.sendVersion(p)
} }
func TestRequestPeerInfo(t *testing.T) {
var (
s = newTestServer()
p = newLocalPeer(t)
)
p.messageHandler = func(t *testing.T, msg *Message) {
assert.Equal(t, CMDGetAddr, msg.CommandType())
assert.Nil(t, msg.Payload)
}
s.requestPeerInfo(p)
}
// Server should reply with a verack after receiving a valid version. // Server should reply with a verack after receiving a valid version.
func TestVerackAfterHandleVersionCmd(t *testing.T) { func TestVerackAfterHandleVersionCmd(t *testing.T) {
var ( var (
@ -89,18 +76,6 @@ func TestServerNotSendsVerack(t *testing.T) {
assert.Equal(t, errIdenticalID, err) assert.Equal(t, errIdenticalID, err)
} }
func TestRequestPeers(t *testing.T) {
var (
s = newTestServer()
p = newLocalPeer(t)
)
p.messageHandler = func(t *testing.T, msg *Message) {
assert.Nil(t, msg.Payload)
assert.Equal(t, CMDGetAddr, msg.CommandType())
}
s.requestPeerInfo(p)
}
func TestRequestHeaders(t *testing.T) { func TestRequestHeaders(t *testing.T) {
var ( var (
s = newTestServer() s = newTestServer()

View file

@ -18,31 +18,27 @@ type TCPPeer struct {
// The version of the peer. // The version of the peer.
version *payload.Version version *payload.Version
done chan error done chan error
closed chan struct{}
disc chan error
wg sync.WaitGroup wg sync.WaitGroup
} }
func NewTCPPeer(conn net.Conn, proto chan protoTuple) *TCPPeer { func NewTCPPeer(conn net.Conn) *TCPPeer {
return &TCPPeer{ return &TCPPeer{
conn: conn, conn: conn,
done: make(chan error), done: make(chan error, 1),
closed: make(chan struct{}),
disc: make(chan error),
endpoint: util.NewEndpoint(conn.RemoteAddr().String()), endpoint: util.NewEndpoint(conn.RemoteAddr().String()),
} }
} }
// Send implements the Peer interface. This will encode the message // WriteMsg implements the Peer interface. This will write/encode the message
// to the underlying connection. // to the underlying connection.
func (p *TCPPeer) Send(msg *Message) { func (p *TCPPeer) WriteMsg(msg *Message) error {
if err := msg.Encode(p.conn); err != nil { select {
select { case err := <-p.done:
case p.disc <- err: return err
case <-p.closed: default:
} return msg.Encode(p.conn)
} }
} }
@ -58,71 +54,17 @@ func (p *TCPPeer) Done() chan error {
return p.done return p.done
} }
// Disconnect will fill the peer's done channel with the given error.
func (p *TCPPeer) Disconnect(err error) {
p.done <- err
}
// Version implements the Peer interface. // Version implements the Peer interface.
func (p *TCPPeer) Version() *payload.Version { func (p *TCPPeer) Version() *payload.Version {
return p.version return p.version
} }
func (p *TCPPeer) readLoop(proto chan protoTuple, readErr chan error) { // SetVersion implements the Peer interface.
defer p.wg.Done() func (p *TCPPeer) SetVersion(v *payload.Version) {
for { p.version = v
select {
case <-p.closed:
return
default:
msg := &Message{}
if err := msg.Decode(p.conn); err != nil {
readErr <- err
return
}
p.handleMessage(msg, proto)
}
}
}
func (p *TCPPeer) handleMessage(msg *Message, proto chan protoTuple) {
switch payload := msg.Payload.(type) {
case *payload.Version:
p.version = payload
}
proto <- protoTuple{
msg: msg,
peer: p,
}
}
func (p *TCPPeer) run(proto chan protoTuple) {
var (
readErr = make(chan error, 1)
err error
)
p.wg.Add(1)
go p.readLoop(proto, readErr)
run:
for {
select {
case err = <-p.disc:
break run
case err = <-readErr:
break run
}
}
// If the peer has not started the protocol with the server
// there will be noone reading from this channel.
select {
case p.done <- err:
default:
}
close(p.closed)
p.conn.Close()
p.wg.Wait()
return
}
// Disconnect implements the Peer interface.
func (p *TCPPeer) Disconnect(reason error) {
p.disc <- reason
} }

View file

@ -13,7 +13,6 @@ type TCPTransport struct {
server *Server server *Server
listener net.Listener listener net.Listener
bindAddr string bindAddr string
proto chan protoTuple
} }
// NewTCPTransport return a new TCPTransport that will listen for // NewTCPTransport return a new TCPTransport that will listen for
@ -22,15 +21,9 @@ func NewTCPTransport(s *Server, bindAddr string) *TCPTransport {
return &TCPTransport{ return &TCPTransport{
server: s, server: s,
bindAddr: bindAddr, bindAddr: bindAddr,
proto: make(chan protoTuple),
} }
} }
// Consumer implements the Transporter interface.
func (t *TCPTransport) Consumer() <-chan protoTuple {
return t.proto
}
// Dial implements the Transporter interface. // Dial implements the Transporter interface.
func (t *TCPTransport) Dial(addr string, timeout time.Duration) error { func (t *TCPTransport) Dial(addr string, timeout time.Duration) error {
conn, err := net.DialTimeout("tcp", addr, timeout) conn, err := net.DialTimeout("tcp", addr, timeout)
@ -58,7 +51,6 @@ func (t *TCPTransport) Accept() {
if t.isCloseError(err) { if t.isCloseError(err) {
break break
} }
continue continue
} }
go t.handleConn(conn) go t.handleConn(conn)
@ -81,11 +73,26 @@ func (t *TCPTransport) isCloseError(err error) bool {
} }
func (t *TCPTransport) handleConn(conn net.Conn) { func (t *TCPTransport) handleConn(conn net.Conn) {
p := NewTCPPeer(conn, t.proto) var (
p = NewTCPPeer(conn)
err error
)
defer func() {
p.Disconnect(err)
}()
t.server.register <- p t.server.register <- p
// This will block until the peer is stopped running.
p.run(t.proto) for {
log.Warnf("TCP released peer: %s", p.Endpoint()) msg := &Message{}
if err = msg.Decode(p.conn); err != nil {
return
}
if err = t.server.handleMessage(p, msg); err != nil {
return
}
}
} }
// Close implements the Transporter interface. // Close implements the Transporter interface.

View file

@ -5,7 +5,6 @@ import "time"
// Transporter is an interface that allows us to abstract // Transporter is an interface that allows us to abstract
// any form of communication between the server and its peers. // any form of communication between the server and its peers.
type Transporter interface { type Transporter interface {
Consumer() <-chan protoTuple
Dial(addr string, timeout time.Duration) error Dial(addr string, timeout time.Duration) error
Accept() Accept()
Proto() string Proto() string