diff --git a/Makefile b/Makefile index 1f17bd7ce..cb5a3d3d0 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ push-tag: git push origin ${VERSION} run: build - ./bin/neo-go node -config-path ./config -${NETMODE} --debug + ./bin/neo-go node -config-path ./config -${NETMODE} test: @go test ./... -cover diff --git a/VERSION b/VERSION index 9b0025a78..d0a191543 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.40.0 +0.40.1 diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1b61cf5d1..f114f482a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -24,7 +24,7 @@ const ( var ( genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} decrementInterval = 2000000 - persistInterval = 5 * time.Second + persistInterval = 1 * time.Second ) // Blockchain represents the blockchain. @@ -164,13 +164,6 @@ func (bc *Blockchain) run() { } } -// For now this will return a hardcoded hash of the NEO governing token. -func (bc *Blockchain) governingToken() util.Uint256 { - neoNativeAsset := "c56f33fc6ecfcd0c225c4ab356fee59390af8560be0e930faebe74a6daff7c9b" - val, _ := util.Uint256DecodeString(neoNativeAsset) - return val -} - // AddBlock processes the given block and will add it to the cache so it // can be persisted. func (bc *Blockchain) AddBlock(block *Block) error { @@ -299,10 +292,6 @@ func (bc *Blockchain) persistBlock(block *Block) error { } else { account.Balances[output.AssetID] = output.Amount } - - if output.AssetID.Equals(bc.governingToken()) && len(account.Votes) > 0 { - // TODO - } } // Process TX inputs that are grouped by previous hash. @@ -324,10 +313,6 @@ func (bc *Blockchain) persistBlock(block *Block) error { return err } - if prevTXOutput.AssetID.Equals(bc.governingToken()) { - // TODO - } - account.Balances[prevTXOutput.AssetID] -= prevTXOutput.Amount } } @@ -367,6 +352,10 @@ func (bc *Blockchain) persist() (err error) { lenCache = bc.blockCache.Len() ) + if lenCache == 0 { + return nil + } + bc.headersOp <- func(headerList *HeaderHashList) { for i := 0; i < lenCache; i++ { if uint32(headerList.Len()) <= bc.BlockHeight() { diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index c007cddf3..8d3c6c7a1 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -53,10 +53,6 @@ func (d testDiscovery) BadPeers() []string { return []string{} } type localTransport struct{} -func (t localTransport) Consumer() <-chan protoTuple { - ch := make(chan protoTuple) - return ch -} func (t localTransport) Dial(addr string, timeout time.Duration) error { return nil } @@ -85,8 +81,9 @@ func (p *localPeer) Endpoint() util.Endpoint { return p.endpoint } func (p *localPeer) Disconnect(err error) {} -func (p *localPeer) Send(msg *Message) { +func (p *localPeer) WriteMsg(msg *Message) error { p.messageHandler(p.t, msg) + return nil } func (p *localPeer) Done() chan error { done := make(chan error) @@ -95,6 +92,9 @@ func (p *localPeer) Done() chan error { func (p *localPeer) Version() *payload.Version { return p.version } +func (p *localPeer) SetVersion(v *payload.Version) { + p.version = v +} func newTestServer() *Server { return &Server{ diff --git a/pkg/network/peer.go b/pkg/network/peer.go index bcaddd8e2..e3b6f3854 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -8,7 +8,8 @@ import ( type Peer interface { Endpoint() util.Endpoint Disconnect(error) - Send(msg *Message) + WriteMsg(msg *Message) error Done() chan error Version() *payload.Version + SetVersion(*payload.Version) } diff --git a/pkg/network/server.go b/pkg/network/server.go index c92f50f84..525ef2f91 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -47,13 +47,6 @@ type ( register chan Peer unregister chan peerDrop quit chan struct{} - - proto <-chan protoTuple - } - - protoTuple struct { - msg *Message - peer Peer } peerDrop struct { @@ -75,7 +68,6 @@ func NewServer(config ServerConfig, chain *core.Blockchain) *Server { } s.transport = NewTCPTransport(s, fmt.Sprintf(":%d", config.ListenTCP)) - s.proto = s.transport.Consumer() s.discovery = NewDefaultDiscovery( s.DialTimeout, s.transport, @@ -96,8 +88,14 @@ func (s *Server) Start(errChan chan error) { "headerHeight": s.chain.HeaderHeight(), }).Info("node started") + for _, addr := range s.Seeds { + if err := s.transport.Dial(addr, s.DialTimeout); err != nil { + log.Warnf("failed to connect to remote node %s", addr) + continue + } + } + go s.transport.Accept() - s.discovery.BackFill(s.Seeds...) s.run() } @@ -112,34 +110,17 @@ func (s *Server) Shutdown() { // UnconnectedPeers returns a list of peers that are in the discovery peer list // but are not connected to the server. func (s *Server) UnconnectedPeers() []string { - return s.discovery.UnconnectedPeers() + return []string{} } // BadPeers returns a list of peers the are flagged as "bad" peers. func (s *Server) BadPeers() []string { - return s.discovery.BadPeers() + return []string{} } func (s *Server) run() { - // Ask discovery to connect with remote nodes to fill up - // the server minimum peer slots. - s.discovery.RequestRemote(minPeers - s.PeerCount()) - for { select { - case proto := <-s.proto: - if err := s.processProto(proto); err != nil { - proto.peer.Disconnect(err) - // verack and version implies that the protocol is - // not started and the only way to disconnect them - // from the server is to manually call unregister. - switch proto.msg.CommandType() { - case CMDVerack, CMDVersion: - go func() { - s.unregister <- peerDrop{proto.peer, err} - }() - } - } case <-s.quit: s.transport.Close() for p := range s.peers { @@ -154,7 +135,6 @@ func (s *Server) run() { "endpoint": p.Endpoint(), }).Info("new peer connected") case drop := <-s.unregister: - s.discovery.RegisterBadAddr(drop.peer.Endpoint().String()) delete(s.peers, drop.peer) log.WithFields(log.Fields{ "endpoint": drop.peer.Endpoint(), @@ -189,7 +169,6 @@ func (s *Server) startProtocol(p Peer) { }).Info("started protocol") s.requestHeaders(p) - s.requestPeerInfo(p) timer := time.NewTimer(s.ProtoTickInterval) for { @@ -202,18 +181,13 @@ func (s *Server) startProtocol(p Peer) { if p.Version().StartHeight > s.chain.BlockHeight() { s.requestBlocks(p) } - // If the discovery does not have a healthy address pool - // we will ask for a new batch of addresses. - if s.discovery.PoolCount() < minPoolCount { - s.requestPeerInfo(p) - } timer.Reset(s.ProtoTickInterval) } } } // When a peer connects to the server, we will send our version immediately. -func (s *Server) sendVersion(p Peer) { +func (s *Server) sendVersion(p Peer) error { payload := payload.NewVersion( s.id, s.ListenTCP, @@ -221,7 +195,7 @@ func (s *Server) sendVersion(p Peer) { s.chain.BlockHeight(), s.Relay, ) - p.Send(NewMessage(s.Net, CMDVersion, payload)) + return p.WriteMsg(NewMessage(s.Net, CMDVersion, payload)) } // When a peer sends out his version we reply with verack after validating @@ -233,8 +207,8 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { if s.id == version.Nonce { return errIdenticalID } - p.Send(NewMessage(s.Net, CMDVerack, nil)) - return nil + p.SetVersion(version) + return p.WriteMsg(NewMessage(s.Net, CMDVerack, nil)) } // handleHeadersCmd will process the headers it received from its peer. @@ -268,12 +242,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { return errInvalidInvType } payload := payload.NewInventory(inv.Type, inv.Hashes) - p.Send(NewMessage(s.Net, CMDGetData, payload)) - return nil -} - -func (s *Server) handleGetHeadersCmd(p Peer, getHeaders *payload.GetBlocks) error { - log.Info(getHeaders) + p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) return nil } @@ -282,13 +251,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, getHeaders *payload.GetBlocks) erro func (s *Server) requestHeaders(p Peer) { start := []util.Uint256{s.chain.CurrentHeaderHash()} payload := payload.NewGetBlocks(start, util.Uint256{}) - p.Send(NewMessage(s.Net, CMDGetHeaders, payload)) -} - -// requestPeerInfo will send a getaddr message to the peer -// which will respond with his known addresses in the network. -func (s *Server) requestPeerInfo(p Peer) { - p.Send(NewMessage(s.Net, CMDGetAddr, nil)) + p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload)) } // requestBlocks will send a getdata message to the peer @@ -307,19 +270,14 @@ func (s *Server) requestBlocks(p Peer) { } if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) - p.Send(NewMessage(s.Net, CMDGetData, payload)) + p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) } else if s.chain.HeaderHeight() < p.Version().StartHeight { s.requestHeaders(p) } } -// process the received protocol message. -func (s *Server) processProto(proto protoTuple) error { - var ( - peer = proto.peer - msg = proto.msg - ) - +// handleMessage will process the given message. +func (s *Server) handleMessage(peer Peer, msg *Message) error { // Make sure both server and peer are operating on // the same network. if msg.Magic != s.Net { @@ -339,9 +297,6 @@ func (s *Server) processProto(proto protoTuple) error { case CMDBlock: block := msg.Payload.(*core.Block) return s.handleBlockCmd(peer, block) - case CMDGetHeaders: - getHeaders := msg.Payload.(*payload.GetBlocks) - s.handleGetHeadersCmd(peer, getHeaders) case CMDVerack: // Make sure this peer has send his version before we start the // protocol with that peer. @@ -349,11 +304,6 @@ func (s *Server) processProto(proto protoTuple) error { return errInvalidHandshake } go s.startProtocol(peer) - case CMDAddr: - addressList := msg.Payload.(*payload.AddressList) - for _, addr := range addressList.Addrs { - s.discovery.BackFill(addr.Endpoint.String()) - } } return nil } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index dba204c55..0db4d31ab 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -31,19 +31,6 @@ func TestSendVersion(t *testing.T) { s.sendVersion(p) } -func TestRequestPeerInfo(t *testing.T) { - var ( - s = newTestServer() - p = newLocalPeer(t) - ) - - p.messageHandler = func(t *testing.T, msg *Message) { - assert.Equal(t, CMDGetAddr, msg.CommandType()) - assert.Nil(t, msg.Payload) - } - s.requestPeerInfo(p) -} - // Server should reply with a verack after receiving a valid version. func TestVerackAfterHandleVersionCmd(t *testing.T) { var ( @@ -89,18 +76,6 @@ func TestServerNotSendsVerack(t *testing.T) { assert.Equal(t, errIdenticalID, err) } -func TestRequestPeers(t *testing.T) { - var ( - s = newTestServer() - p = newLocalPeer(t) - ) - p.messageHandler = func(t *testing.T, msg *Message) { - assert.Nil(t, msg.Payload) - assert.Equal(t, CMDGetAddr, msg.CommandType()) - } - s.requestPeerInfo(p) -} - func TestRequestHeaders(t *testing.T) { var ( s = newTestServer() diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index de80778dc..6d3cca187 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -18,31 +18,27 @@ type TCPPeer struct { // The version of the peer. version *payload.Version - done chan error - closed chan struct{} - disc chan error + done chan error wg sync.WaitGroup } -func NewTCPPeer(conn net.Conn, proto chan protoTuple) *TCPPeer { +func NewTCPPeer(conn net.Conn) *TCPPeer { return &TCPPeer{ conn: conn, - done: make(chan error), - closed: make(chan struct{}), - disc: make(chan error), + done: make(chan error, 1), endpoint: util.NewEndpoint(conn.RemoteAddr().String()), } } -// Send implements the Peer interface. This will encode the message +// WriteMsg implements the Peer interface. This will write/encode the message // to the underlying connection. -func (p *TCPPeer) Send(msg *Message) { - if err := msg.Encode(p.conn); err != nil { - select { - case p.disc <- err: - case <-p.closed: - } +func (p *TCPPeer) WriteMsg(msg *Message) error { + select { + case err := <-p.done: + return err + default: + return msg.Encode(p.conn) } } @@ -58,71 +54,17 @@ func (p *TCPPeer) Done() chan error { return p.done } +// Disconnect will fill the peer's done channel with the given error. +func (p *TCPPeer) Disconnect(err error) { + p.done <- err +} + // Version implements the Peer interface. func (p *TCPPeer) Version() *payload.Version { return p.version } -func (p *TCPPeer) readLoop(proto chan protoTuple, readErr chan error) { - defer p.wg.Done() - for { - select { - case <-p.closed: - return - default: - msg := &Message{} - if err := msg.Decode(p.conn); err != nil { - readErr <- err - return - } - p.handleMessage(msg, proto) - } - } -} - -func (p *TCPPeer) handleMessage(msg *Message, proto chan protoTuple) { - switch payload := msg.Payload.(type) { - case *payload.Version: - p.version = payload - } - proto <- protoTuple{ - msg: msg, - peer: p, - } -} - -func (p *TCPPeer) run(proto chan protoTuple) { - var ( - readErr = make(chan error, 1) - err error - ) - p.wg.Add(1) - go p.readLoop(proto, readErr) - -run: - for { - select { - case err = <-p.disc: - break run - case err = <-readErr: - break run - } - } - - // If the peer has not started the protocol with the server - // there will be noone reading from this channel. - select { - case p.done <- err: - default: - } - - close(p.closed) - p.conn.Close() - p.wg.Wait() - return -} - -// Disconnect implements the Peer interface. -func (p *TCPPeer) Disconnect(reason error) { - p.disc <- reason +// SetVersion implements the Peer interface. +func (p *TCPPeer) SetVersion(v *payload.Version) { + p.version = v } diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index 915440b2c..a9c1b4cd3 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -13,7 +13,6 @@ type TCPTransport struct { server *Server listener net.Listener bindAddr string - proto chan protoTuple } // NewTCPTransport return a new TCPTransport that will listen for @@ -22,15 +21,9 @@ func NewTCPTransport(s *Server, bindAddr string) *TCPTransport { return &TCPTransport{ server: s, bindAddr: bindAddr, - proto: make(chan protoTuple), } } -// Consumer implements the Transporter interface. -func (t *TCPTransport) Consumer() <-chan protoTuple { - return t.proto -} - // Dial implements the Transporter interface. func (t *TCPTransport) Dial(addr string, timeout time.Duration) error { conn, err := net.DialTimeout("tcp", addr, timeout) @@ -58,7 +51,6 @@ func (t *TCPTransport) Accept() { if t.isCloseError(err) { break } - continue } go t.handleConn(conn) @@ -81,11 +73,26 @@ func (t *TCPTransport) isCloseError(err error) bool { } func (t *TCPTransport) handleConn(conn net.Conn) { - p := NewTCPPeer(conn, t.proto) + var ( + p = NewTCPPeer(conn) + err error + ) + + defer func() { + p.Disconnect(err) + }() + t.server.register <- p - // This will block until the peer is stopped running. - p.run(t.proto) - log.Warnf("TCP released peer: %s", p.Endpoint()) + + for { + msg := &Message{} + if err = msg.Decode(p.conn); err != nil { + return + } + if err = t.server.handleMessage(p, msg); err != nil { + return + } + } } // Close implements the Transporter interface. diff --git a/pkg/network/transport.go b/pkg/network/transport.go index dd0113fd5..684f86717 100644 --- a/pkg/network/transport.go +++ b/pkg/network/transport.go @@ -5,7 +5,6 @@ import "time" // Transporter is an interface that allows us to abstract // any form of communication between the server and its peers. type Transporter interface { - Consumer() <-chan protoTuple Dial(addr string, timeout time.Duration) error Accept() Proto() string