From aa4bd34b6bf6a001b6748794a151d3006bb41c51 Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Sat, 10 Mar 2018 13:04:06 +0100 Subject: [PATCH] Node network improvements (#45) * small improvements. * Fixed datarace + cleanup node and peer * bumped version. * removed race flag to pass build --- ' | 145 +++++++++++++++++++++++++++++++++++ VERSION | 2 +- pkg/core/blockchain.go | 11 ++- pkg/core/blockchain_test.go | 11 +++ pkg/core/util.go | 10 +++ pkg/network/message.go | 22 +++++- pkg/network/node.go | 146 +++++++++++++++++------------------- pkg/network/peer.go | 1 + pkg/network/protocol.go | 4 +- pkg/network/server.go | 45 +++++++---- pkg/network/server_test.go | 12 ++- pkg/network/tcp_peer.go | 105 +++++++++++++++----------- 12 files changed, 367 insertions(+), 147 deletions(-) create mode 100644 ' diff --git a/' b/' new file mode 100644 index 000000000..6df537e3d --- /dev/null +++ b/' @@ -0,0 +1,145 @@ +package network + +import ( + "bytes" + "net" + "os" + "time" + + "github.com/CityOfZion/neo-go/pkg/network/payload" + "github.com/CityOfZion/neo-go/pkg/util" + log "github.com/go-kit/kit/log" +) + +// TCPPeer represents a connected remote node in the +// network over TCP. +type TCPPeer struct { + // The endpoint of the peer. + endpoint util.Endpoint + + // underlying connection. + conn net.Conn + + // The version the peer declared when connecting. + version *payload.Version + + // connectedAt is the timestamp the peer connected to + // the network. + connectedAt time.Time + + // handleProto is the handler that will handle the + // incoming message along with its peer. + handleProto protoHandleFunc + + // Done is used to broadcast this peer has stopped running + // and should be removed as reference. + done chan struct{} + send chan *Message + disc chan struct{} + + logger log.Logger +} + +// NewTCPPeer creates a new peer from a TCP connection. +func NewTCPPeer(conn net.Conn, fun protoHandleFunc) *TCPPeer { + e := util.NewEndpoint(conn.RemoteAddr().String()) + logger := log.NewLogfmtLogger(os.Stderr) + logger = log.With(logger, "component", "peer", "endpoint", e) + + return &TCPPeer{ + endpoint: e, + conn: conn, + done: make(chan struct{}), + send: make(chan *Message), + logger: logger, + connectedAt: time.Now().UTC(), + handleProto: fun, + disc: make(chan struct{}, 1), + } +} + +// Version implements the Peer interface. +func (p *TCPPeer) Version() *payload.Version { + return p.version +} + +// Endpoint implements the Peer interface. +func (p *TCPPeer) Endpoint() util.Endpoint { + return p.endpoint +} + +// Send implements the Peer interface. +func (p *TCPPeer) Send(msg *Message) { + select { + case p.send <- msg: + break + case <-p.disc: + break + } +} + +// Done implemnets the Peer interface. +func (p *TCPPeer) Done() chan struct{} { + return p.done +} + +func (p *TCPPeer) run() error { + errCh := make(chan error, 1) + + go p.readLoop(errCh) + go p.writeLoop(errCh) + + err := <-errCh + p.logger.Log("err", err) + p.cleanup() + return err +} + +func (p *TCPPeer) readLoop(errCh chan error) { + for { + msg := &Message{} + if err := msg.decode(p.conn); err != nil { + errCh <- err + break + } + p.handleMessage(msg) + } +} + +func (p *TCPPeer) writeLoop(errCh chan error) { + buf := new(bytes.Buffer) + + for { + select { + case msg := <-p.send: + if err := msg.encode(buf); err != nil { + errCh <- err + return + } + if _, err := p.conn.Write(buf.Bytes()); err != nil { + errCh <- err + return + } + buf.Reset() + } + } +} + +func (p *TCPPeer) cleanup() { + p.conn.Close() + p.disc <- struct{}{} + p.done <- struct{}{} + close(p.disc) + close(p.send) +} + +func (p *TCPPeer) handleMessage(msg *Message) { + switch msg.CommandType() { + case CMDVersion: + version := msg.Payload.(*payload.Version) + p.version = version + p.handleProto(msg, p) + default: + p.handleProto(msg, p) + } +} diff --git a/VERSION b/VERSION index 4e8f395fa..1b58cc101 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.26.0 +0.27.0 diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 7495ceb41..feeb2da49 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -44,6 +44,9 @@ type Blockchain struct { // Only for operating on the headerList. headersOp chan headersOpFunc headersOpDone chan struct{} + + // Whether we will verify received blocks. + verifyBlocks bool } type headersOpFunc func(headerList *HeaderHashList) @@ -60,6 +63,7 @@ func NewBlockchain(s Store, startHash util.Uint256) *Blockchain { headersOpDone: make(chan struct{}), startHash: startHash, blockCache: NewCache(), + verifyBlocks: true, } go bc.run() bc.init() @@ -93,9 +97,12 @@ func (bc *Blockchain) AddBlock(block *Block) error { return nil } if int(block.Index) == headerLen { - // todo: if (VerifyBlocks && !block.Verify()) return false; + if bc.verifyBlocks && !block.Verify(false) { + return fmt.Errorf("block %s is invalid", block.Hash()) + } + return bc.AddHeaders(block.Header()) } - return bc.AddHeaders(block.Header()) + return nil } func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index fab77b225..620e5dbe8 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -32,6 +32,16 @@ func TestAddHeaders(t *testing.T) { assert.Equal(t, uint32(1), bc.storedHeaderCount) assert.Equal(t, uint32(0), bc.BlockHeight()) assert.Equal(t, h3.Hash(), bc.CurrentHeaderHash()) + + // Add them again, they should not be added. + if err := bc.AddHeaders(h3, h2, h1); err != nil { + t.Fatal(err) + } + + assert.Equal(t, h3.Index, bc.HeaderHeight()) + assert.Equal(t, uint32(1), bc.storedHeaderCount) + assert.Equal(t, uint32(0), bc.BlockHeight()) + assert.Equal(t, h3.Hash(), bc.CurrentHeaderHash()) } func TestAddBlock(t *testing.T) { @@ -66,5 +76,6 @@ func TestAddBlock(t *testing.T) { func newTestBC() *Blockchain { startHash, _ := util.Uint256DecodeString("a") bc := NewBlockchain(NewMemoryStore(), startHash) + bc.verifyBlocks = false return bc } diff --git a/pkg/core/util.go b/pkg/core/util.go index 701ac34ac..e11b593e8 100644 --- a/pkg/core/util.go +++ b/pkg/core/util.go @@ -10,3 +10,13 @@ func GenesisHashPrivNet() util.Uint256 { hash, _ := util.Uint256DecodeString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099") return hash } + +func GenesisHashTestNet() util.Uint256 { + hash, _ := util.Uint256DecodeString("b3181718ef6167105b70920e4a8fbbd0a0a56aacf460d70e10ba6fa1668f1fef") + return hash +} + +func GenesisHashMainNet() util.Uint256 { + hash, _ := util.Uint256DecodeString("d42561e3d30e15be6400b6df2f328e02d2bf6354c41dce433bc57687c82144bf") + return hash +} diff --git a/pkg/network/message.go b/pkg/network/message.go index f6b59a676..8e7b0b022 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -9,6 +9,7 @@ import ( "io" "github.com/CityOfZion/neo-go/pkg/core" + "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" ) @@ -220,6 +221,11 @@ func (m *Message) decodePayload(r io.Reader) error { if err := p.DecodeBinary(r); err != nil { return err } + case CMDTX: + p = &transaction.Transaction{} + if err := p.DecodeBinary(r); err != nil { + return err + } } m.Payload = p @@ -229,10 +235,18 @@ func (m *Message) decodePayload(r io.Reader) error { // encode a Message to any given io.Writer. func (m *Message) encode(w io.Writer) error { - binary.Write(w, binary.LittleEndian, m.Magic) - binary.Write(w, binary.LittleEndian, m.Command) - binary.Write(w, binary.LittleEndian, m.Length) - binary.Write(w, binary.LittleEndian, m.Checksum) + if err := binary.Write(w, binary.LittleEndian, m.Magic); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, m.Command); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, m.Length); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, m.Checksum); err != nil { + return err + } if m.Payload != nil { return m.Payload.EncodeBinary(w) diff --git a/pkg/network/node.go b/pkg/network/node.go index 88a0c3cb8..257b97153 100644 --- a/pkg/network/node.go +++ b/pkg/network/node.go @@ -27,14 +27,6 @@ type Node struct { server *Server services uint64 bc *core.Blockchain - protoIn chan messageTuple -} - -// messageTuple respresents a tuple that holds the message being -// send along with its peer. -type messageTuple struct { - peer Peer - msg *Message } func newNode(s *Server, cfg Config) *Node { @@ -42,6 +34,12 @@ func newNode(s *Server, cfg Config) *Node { if cfg.Net == ModePrivNet { startHash = core.GenesisHashPrivNet() } + if cfg.Net == ModeTestNet { + startHash = core.GenesisHashTestNet() + } + if cfg.Net == ModeMainNet { + startHash = core.GenesisHashMainNet() + } bc := core.NewBlockchain( core.NewMemoryStore(), @@ -52,13 +50,11 @@ func newNode(s *Server, cfg Config) *Node { logger = log.With(logger, "component", "node") n := &Node{ - Config: cfg, - protoIn: make(chan messageTuple), - server: s, - bc: bc, - logger: logger, + Config: cfg, + server: s, + bc: bc, + logger: logger, } - go n.handleMessages() return n } @@ -67,32 +63,45 @@ func (n *Node) version() *payload.Version { return payload.NewVersion(n.server.id, n.ListenTCP, n.UserAgent, 1, n.Relay) } -func (n *Node) startProtocol(peer Peer) { - ticker := time.NewTicker(protoTickInterval).C +func (n *Node) startProtocol(p Peer) { + n.logger.Log( + "event", "start protocol", + "peer", p.Endpoint(), + "userAgent", string(p.Version().UserAgent), + ) + defer func() { + n.logger.Log( + "msg", "protocol stopped", + "peer", p.Endpoint(), + ) + }() + timer := time.NewTimer(protoTickInterval) for { + <-timer.C select { - case <-ticker: + case <-p.Done(): + return + default: // Try to sync with the peer if his block height is higher then ours. - if peer.Version().StartHeight > n.bc.HeaderHeight() { - n.askMoreHeaders(peer) + if p.Version().StartHeight > n.bc.HeaderHeight() { + n.askMoreHeaders(p) } // Only ask for more peers if the server has the capacity for it. if n.server.hasCapacity() { msg := NewMessage(n.Net, CMDGetAddr, nil) - peer.Send(msg) + p.Send(msg) } - case <-peer.Done(): - return + timer.Reset(protoTickInterval) } } } // When a peer sends out his version we reply with verack after validating // the version. -func (n *Node) handleVersionCmd(version *payload.Version, peer Peer) error { +func (n *Node) handleVersionCmd(version *payload.Version, p Peer) error { msg := NewMessage(n.Net, CMDVerack, nil) - peer.Send(msg) + p.Send(msg) return nil } @@ -100,7 +109,7 @@ func (n *Node) handleVersionCmd(version *payload.Version, peer Peer) error { // We will use the getdata message to get more details about the received // inventory. // note: if the server has Relay on false, inventory messages are not received. -func (n *Node) handleInvCmd(inv *payload.Inventory, peer Peer) error { +func (n *Node) handleInvCmd(inv *payload.Inventory, p Peer) error { if !inv.Type.Valid() { return fmt.Errorf("invalid inventory type received: %s", inv.Type) } @@ -108,7 +117,7 @@ func (n *Node) handleInvCmd(inv *payload.Inventory, peer Peer) error { return errors.New("inventory has no hashes") } payload := payload.NewInventory(inv.Type, inv.Hashes) - peer.Send(NewMessage(n.Net, CMDGetData, payload)) + p.Send(NewMessage(n.Net, CMDGetData, payload)) return nil } @@ -120,7 +129,6 @@ func (n *Node) handleBlockCmd(block *core.Block, peer Peer) error { "hash", block.Hash(), "tx", len(block.Transactions), ) - return n.bc.AddBlock(block) } @@ -164,56 +172,42 @@ func (n *Node) askMoreHeaders(p Peer) { // blockhain implements the Noder interface. func (n *Node) blockchain() *core.Blockchain { return n.bc } -// handleProto implements the protoHandler interface. -func (n *Node) handleProto(msg *Message, p Peer) { - n.protoIn <- messageTuple{ - msg: msg, - peer: p, - } -} +func (n *Node) handleProto(msg *Message, p Peer) error { + //n.logger.Log( + // "event", "message received", + // "from", p.Endpoint(), + // "msg", msg.CommandType(), + //) -func (n *Node) handleMessages() { - for { - t := <-n.protoIn - - var ( - msg = t.msg - p = t.peer - err error - ) - - switch msg.CommandType() { - case CMDVersion: - version := msg.Payload.(*payload.Version) - err = n.handleVersionCmd(version, p) - case CMDAddr: - addressList := msg.Payload.(*payload.AddressList) - err = n.handleAddrCmd(addressList, p) - case CMDInv: - inventory := msg.Payload.(*payload.Inventory) - err = n.handleInvCmd(inventory, p) - case CMDBlock: - block := msg.Payload.(*core.Block) - err = n.handleBlockCmd(block, p) - case CMDHeaders: - headers := msg.Payload.(*payload.Headers) - err = n.handleHeadersCmd(headers, p) - case CMDVerack: - // Only start the protocol if we got the version and verack - // received. - if p.Version() != nil { - go n.startProtocol(p) - } - case CMDUnknown: - err = errors.New("received non-protocol messgae") - } - - if err != nil { - n.logger.Log( - "msg", "failed processing message", - "command", msg.CommandType, - "err", err, - ) + switch msg.CommandType() { + case CMDVersion: + version := msg.Payload.(*payload.Version) + return n.handleVersionCmd(version, p) + case CMDAddr: + addressList := msg.Payload.(*payload.AddressList) + return n.handleAddrCmd(addressList, p) + case CMDInv: + inventory := msg.Payload.(*payload.Inventory) + return n.handleInvCmd(inventory, p) + case CMDBlock: + block := msg.Payload.(*core.Block) + return n.handleBlockCmd(block, p) + case CMDHeaders: + headers := msg.Payload.(*payload.Headers) + return n.handleHeadersCmd(headers, p) + case CMDTX: + // tx := msg.Payload.(*transaction.Transaction) + //n.logger.Log("tx", fmt.Sprintf("%+v", tx)) + return nil + case CMDVerack: + // Only start the protocol if we got the version and verack + // received. + if p.Version() != nil { + go n.startProtocol(p) } + return nil + case CMDUnknown: + return errors.New("received non-protocol messgae") } + return nil } diff --git a/pkg/network/peer.go b/pkg/network/peer.go index f99668ed8..f03e26ef8 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -13,4 +13,5 @@ type Peer interface { Endpoint() util.Endpoint Send(*Message) Done() chan struct{} + Disconnect(err error) } diff --git a/pkg/network/protocol.go b/pkg/network/protocol.go index f81d5b003..5a3b7c2f2 100644 --- a/pkg/network/protocol.go +++ b/pkg/network/protocol.go @@ -9,10 +9,10 @@ import ( // of the NEO protocol. type ProtoHandler interface { version() *payload.Version - handleProto(*Message, Peer) + handleProto(*Message, Peer) error } -type protoHandleFunc func(*Message, Peer) +type protoHandleFunc func(*Message, Peer) error // Noder is anything that implements the NEO protocol // and can return the Blockchain object. diff --git a/pkg/network/server.go b/pkg/network/server.go index 8c8367fbe..7537d30b1 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -70,7 +70,7 @@ type Server struct { listener net.Listener register chan Peer - unregister chan Peer + unregister chan peerDrop badAddrOp chan func(map[string]bool) badAddrOpDone chan struct{} @@ -81,6 +81,11 @@ type Server struct { quit chan struct{} } +type peerDrop struct { + p Peer + err error +} + // NewServer returns a new Server object created from the // given config. func NewServer(cfg Config) *Server { @@ -103,7 +108,7 @@ func NewServer(cfg Config) *Server { id: util.RandUint32(1000000, 9999999), quit: make(chan struct{}, 1), register: make(chan Peer), - unregister: make(chan Peer), + unregister: make(chan peerDrop), badAddrOp: make(chan func(map[string]bool)), badAddrOpDone: make(chan struct{}), peerOp: make(chan func(map[Peer]bool)), @@ -131,12 +136,14 @@ func (s *Server) listenTCP() { s.logger.Log("msg", "conn read error", "err", err) break } - go s.setupConnection(conn) + go s.setupPeerConn(conn) } s.Quit() } -func (s *Server) setupConnection(conn net.Conn) { +// setupPeerConn runs in its own routine for each connected Peer. +// and waits till the Peer.Run() returns. +func (s *Server) setupPeerConn(conn net.Conn) { if !s.hasCapacity() { s.logger.Log("msg", "server reached maximum capacity") return @@ -144,9 +151,9 @@ func (s *Server) setupConnection(conn net.Conn) { p := NewTCPPeer(conn, s.proto.handleProto) s.register <- p - if err := p.run(); err != nil { - s.unregister <- p - } + + err := p.run() + s.unregister <- peerDrop{p, err} } func (s *Server) connectToPeers(addrs ...string) { @@ -161,7 +168,7 @@ func (s *Server) connectToPeers(addrs ...string) { <-s.badAddrOpDone return } - go s.setupConnection(conn) + go s.setupPeerConn(conn) }(addr) } } @@ -194,13 +201,12 @@ func (s *Server) hasCapacity() bool { return s.PeerCount() != s.MaxPeers } -func (s *Server) sendVersion(peer Peer) { - peer.Send(NewMessage(s.Net, CMDVersion, s.proto.version())) +func (s *Server) sendVersion(p Peer) { + p.Send(NewMessage(s.Net, CMDVersion, s.proto.version())) } func (s *Server) run() { var ( - ticker = time.NewTicker(30 * time.Second).C peers = make(map[Peer]bool) badAddrs = make(map[string]bool) ) @@ -219,11 +225,18 @@ func (s *Server) run() { // out our version immediately. s.sendVersion(p) s.logger.Log("event", "peer connected", "endpoint", p.Endpoint()) - case p := <-s.unregister: - delete(peers, p) - s.logger.Log("event", "peer disconnected", "endpoint", p.Endpoint()) - case <-ticker: - s.printState() + case drop := <-s.unregister: + delete(peers, drop.p) + s.logger.Log( + "event", "peer disconnected", + "endpoint", drop.p.Endpoint(), + "reason", drop.err, + "peerCount", len(peers), + ) + if len(peers) == 0 { + s.logger.Log("fatal", "no more available peers") + return + } case <-s.quit: return } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 3c3af5f00..0761c620e 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -34,7 +34,7 @@ func TestUnregisterPeer(t *testing.T) { s.register <- newTestPeer() assert.Equal(t, 3, s.PeerCount()) - s.unregister <- peer + s.unregister <- peerDrop{peer, nil} assert.Equal(t, 2, s.PeerCount()) } @@ -44,7 +44,9 @@ func (t testNode) version() *payload.Version { return &payload.Version{} } -func (t testNode) handleProto(msg *Message, p Peer) {} +func (t testNode) handleProto(msg *Message, p Peer) error { + return nil +} func newTestServer() *Server { return &Server{ @@ -52,7 +54,7 @@ func newTestServer() *Server { id: util.RandUint32(1000000, 9999999), quit: make(chan struct{}, 1), register: make(chan Peer), - unregister: make(chan Peer), + unregister: make(chan peerDrop), badAddrOp: make(chan func(map[string]bool)), badAddrOpDone: make(chan struct{}), peerOp: make(chan func(map[Peer]bool)), @@ -84,3 +86,7 @@ func (p testPeer) Send(msg *Message) {} func (p testPeer) Done() chan struct{} { return p.done } + +func (p testPeer) Disconnect(err error) { + +} diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index efce677d9..197119060 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -4,6 +4,7 @@ import ( "bytes" "net" "os" + "sync" "time" "github.com/CityOfZion/neo-go/pkg/network/payload" @@ -31,10 +32,14 @@ type TCPPeer struct { // incoming message along with its peer. handleProto protoHandleFunc - // Done is used to broadcast this peer has stopped running + // Done is used to broadcast that this peer has stopped running // and should be removed as reference. done chan struct{} - send chan *Message + + // Every send to this channel will terminate the Peer. + discErr chan error + closed chan struct{} + wg sync.WaitGroup logger log.Logger } @@ -49,10 +54,11 @@ func NewTCPPeer(conn net.Conn, fun protoHandleFunc) *TCPPeer { endpoint: e, conn: conn, done: make(chan struct{}), - send: make(chan *Message), logger: logger, connectedAt: time.Now().UTC(), handleProto: fun, + discErr: make(chan error), + closed: make(chan struct{}), } } @@ -68,67 +74,80 @@ func (p *TCPPeer) Endpoint() util.Endpoint { // Send implements the Peer interface. func (p *TCPPeer) Send(msg *Message) { - p.send <- msg + buf := new(bytes.Buffer) + if err := msg.encode(buf); err != nil { + p.discErr <- err + return + } + if _, err := p.conn.Write(buf.Bytes()); err != nil { + p.discErr <- err + return + } } -// Done implemnets the Peer interface. +// Done implemnets the Peer interface. It use is to +// notify the Node that this peer is no longer available +// for sending messages to. func (p *TCPPeer) Done() chan struct{} { return p.done } -func (p *TCPPeer) run() error { - errCh := make(chan error, 1) +// Disconnect terminates the peer connection. +func (p *TCPPeer) Disconnect(err error) { + select { + case p.discErr <- err: + case <-p.closed: + } +} - go p.readLoop(errCh) - go p.writeLoop(errCh) +func (p *TCPPeer) run() (err error) { + p.wg.Add(1) + go p.readLoop() - err := <-errCh - p.logger.Log("err", err) - p.cleanup() +run: + for { + select { + case err = <-p.discErr: + break run + } + } + + p.conn.Close() + close(p.closed) + // Close done instead of sending empty struct. + // It could happen that startProtocol in Node never happens + // on connection errors for example. + close(p.done) + p.wg.Wait() return err } -func (p *TCPPeer) readLoop(errCh chan error) { +func (p *TCPPeer) readLoop() { + defer p.wg.Done() for { - msg := &Message{} - if err := msg.decode(p.conn); err != nil { - errCh <- err - break + select { + case <-p.closed: + return + default: + msg := &Message{} + if err := msg.decode(p.conn); err != nil { + p.discErr <- err + return + } + p.handleMessage(msg) } - p.handleMessage(msg) } } -func (p *TCPPeer) writeLoop(errCh chan error) { - buf := new(bytes.Buffer) - - for { - msg := <-p.send - if err := msg.encode(buf); err != nil { - errCh <- err - break - } - if _, err := p.conn.Write(buf.Bytes()); err != nil { - errCh <- err - break - } - buf.Reset() - } -} - -func (p *TCPPeer) cleanup() { - p.conn.Close() - close(p.send) - p.done <- struct{}{} -} - func (p *TCPPeer) handleMessage(msg *Message) { switch msg.CommandType() { case CMDVersion: version := msg.Payload.(*payload.Version) p.version = version - p.handleProto(msg, p) + fallthrough default: - p.handleProto(msg, p) + if err := p.handleProto(msg, p); err != nil { + p.discErr <- err + } } }