From 1fd7938fd877b7073f29302f4ef7b79e57fb21aa Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 5 Jun 2020 12:11:22 +0300 Subject: [PATCH] network: process state roots properly --- pkg/consensus/consensus.go | 12 +++--- pkg/consensus/consensus_test.go | 3 +- pkg/{consensus => core/cache}/cache.go | 27 ++++++------- pkg/{consensus => core/cache}/cache_test.go | 32 ++++++++-------- pkg/network/server.go | 42 +++++++++++++++++---- 5 files changed, 72 insertions(+), 44 deletions(-) rename pkg/{consensus => core/cache}/cache.go (60%) rename pkg/{consensus => core/cache}/cache_test.go (68%) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 742e8d23f..183280313 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/dbft/payload" "github.com/nspcc-dev/neo-go/pkg/core" coreb "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/cache" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -50,9 +51,9 @@ type service struct { log *zap.Logger // cache is a fifo cache which stores recent payloads. - cache *relayCache + cache *cache.HashCache // txx is a fifo cache which stores miner transactions. - txx *relayCache + txx *cache.HashCache dbft *dbft.DBFT // messages and transactions are channels needed to process // everything in single thread. @@ -71,7 +72,7 @@ type Config struct { Logger *zap.Logger // Broadcast is a callback which is called to notify server // about new consensus payload to sent. - Broadcast func(p *Payload) + Broadcast func(cache.Hashable) // Chain is a core.Blockchainer instance. Chain core.Blockchainer // RequestTx is a callback to which will be called @@ -97,8 +98,8 @@ func NewService(cfg Config) (Service, error) { Config: cfg, log: cfg.Logger, - cache: newFIFOCache(cacheMaxCapacity), - txx: newFIFOCache(cacheMaxCapacity), + cache: cache.NewFIFOCache(cacheMaxCapacity), + txx: cache.NewFIFOCache(cacheMaxCapacity), messages: make(chan Payload, 100), transactions: make(chan *transaction.Transaction, 100), @@ -394,6 +395,7 @@ func (s *service) processBlock(b block.Block) { if err := s.Chain.AddStateRoot(r); err != nil { s.log.Warn("errors while adding state root", zap.Error(err)) } + s.Broadcast(r) } func (s *service) getBlockWitness(_ *coreb.Block) *transaction.Witness { diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 285971622..a5713f7f3 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/dbft/payload" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core" + "github.com/nspcc-dev/neo-go/pkg/core/cache" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -182,7 +183,7 @@ func shouldNotReceive(t *testing.T, ch chan Payload) { func newTestService(t *testing.T) *service { srv, err := NewService(Config{ Logger: zaptest.NewLogger(t), - Broadcast: func(*Payload) {}, + Broadcast: func(cache.Hashable) {}, Chain: newTestChain(t), RequestTx: func(...util.Uint256) {}, Wallet: &wallet.Config{ diff --git a/pkg/consensus/cache.go b/pkg/core/cache/cache.go similarity index 60% rename from pkg/consensus/cache.go rename to pkg/core/cache/cache.go index 4a6853803..962b779ed 100644 --- a/pkg/consensus/cache.go +++ b/pkg/core/cache/cache.go @@ -1,4 +1,4 @@ -package consensus +package cache import ( "container/list" @@ -7,9 +7,9 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" ) -// relayCache is a payload cache which is used to store +// HashCache is a payload cache which is used to store // last consensus payloads. -type relayCache struct { +type HashCache struct { *sync.RWMutex maxCap int @@ -17,13 +17,14 @@ type relayCache struct { queue *list.List } -// hashable is a type of items which can be stored in the relayCache. -type hashable interface { +// Hashable is a type of items which can be stored in the HashCache. +type Hashable interface { Hash() util.Uint256 } -func newFIFOCache(capacity int) *relayCache { - return &relayCache{ +// NewFIFOCache returns new FIFO cache with the specified capacity. +func NewFIFOCache(capacity int) *HashCache { + return &HashCache{ RWMutex: new(sync.RWMutex), maxCap: capacity, @@ -33,7 +34,7 @@ func newFIFOCache(capacity int) *relayCache { } // Add adds payload into a cache if it doesn't already exist. -func (c *relayCache) Add(p hashable) { +func (c *HashCache) Add(p Hashable) { c.Lock() defer c.Unlock() @@ -45,7 +46,7 @@ func (c *relayCache) Add(p hashable) { if c.queue.Len() >= c.maxCap { first := c.queue.Front() c.queue.Remove(first) - delete(c.elems, first.Value.(hashable).Hash()) + delete(c.elems, first.Value.(Hashable).Hash()) } e := c.queue.PushBack(p) @@ -53,7 +54,7 @@ func (c *relayCache) Add(p hashable) { } // Has checks if an item is already in cache. -func (c *relayCache) Has(h util.Uint256) bool { +func (c *HashCache) Has(h util.Uint256) bool { c.RLock() defer c.RUnlock() @@ -61,13 +62,13 @@ func (c *relayCache) Has(h util.Uint256) bool { } // Get returns payload with the specified hash from cache. -func (c *relayCache) Get(h util.Uint256) hashable { +func (c *HashCache) Get(h util.Uint256) Hashable { c.RLock() defer c.RUnlock() e, ok := c.elems[h] if !ok { - return hashable(nil) + return Hashable(nil) } - return e.Value.(hashable) + return e.Value.(Hashable) } diff --git a/pkg/consensus/cache_test.go b/pkg/core/cache/cache_test.go similarity index 68% rename from pkg/consensus/cache_test.go rename to pkg/core/cache/cache_test.go index cd4ebe5a3..e8288e2d7 100644 --- a/pkg/consensus/cache_test.go +++ b/pkg/core/cache/cache_test.go @@ -1,17 +1,19 @@ -package consensus +package cache import ( + "math/rand" "testing" - "github.com/nspcc-dev/dbft/payload" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/internal/random" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" ) func TestRelayCache_Add(t *testing.T) { const capacity = 3 - payloads := getDifferentPayloads(t, capacity+1) - c := newFIFOCache(capacity) + payloads := getDifferentItems(t, capacity+1) + c := NewFIFOCache(capacity) require.Equal(t, 0, c.queue.Len()) require.Equal(t, 0, len(c.elems)) @@ -46,19 +48,15 @@ func TestRelayCache_Add(t *testing.T) { require.Equal(t, nil, c.Get(payloads[1].Hash())) } -func getDifferentPayloads(t *testing.T, n int) (payloads []Payload) { - payloads = make([]Payload, n) - for i := range payloads { - var sign [signatureSize]byte - random.Fill(sign[:]) +type testHashable []byte - payloads[i].message = &message{} - payloads[i].SetValidatorIndex(uint16(i)) - payloads[i].SetType(payload.MessageType(commitType)) - payloads[i].payload = &commit{ - signature: sign, - } +// Hash implements Hashable. +func (h testHashable) Hash() util.Uint256 { return hash.Sha256(h) } + +func getDifferentItems(t *testing.T, n int) []testHashable { + items := make([]testHashable, n) + for i := range items { + items[i] = random.Bytes(rand.Int() % 10) } - - return + return items } diff --git a/pkg/network/server.go b/pkg/network/server.go index a9559eba7..1b0c3076c 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/cache" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -29,6 +30,7 @@ const ( maxBlockBatch = 200 maxAddrsToSend = 200 minPoolCount = 30 + stateRootCacheSize = 100 ) var ( @@ -67,6 +69,7 @@ type ( transactions chan *transaction.Transaction + stateCache cache.HashCache consensusStarted *atomic.Bool log *zap.Logger @@ -99,6 +102,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* unregister: make(chan peerDrop), peers: make(map[Peer]bool), consensusStarted: atomic.NewBool(false), + stateCache: *cache.NewFIFOCache(stateRootCacheSize), log: log, transactions: make(chan *transaction.Transaction, 64), } @@ -470,6 +474,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { cp := s.consensus.GetPayload(h) return cp != nil }, + payload.StateRootType: s.stateCache.Has, } if exists := typExists[inv.Type]; exists != nil { for _, hash := range inv.Hashes { @@ -509,7 +514,10 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { msg = s.MkMsg(CMDBlock, b) } case payload.StateRootType: - return nil // do nothing + r := s.stateCache.Get(hash) + if r != nil { + msg = s.MkMsg(CMDStateRoot, r.(*state.MPTRoot)) + } case payload.ConsensusType: if cp := s.consensus.GetPayload(hash); cp != nil { msg = s.MkMsg(CMDConsensus, cp) @@ -613,12 +621,21 @@ func (s *Server) handleGetRootsCmd(p Peer, gr *payload.GetStateRoots) error { // handleStateRootsCmd processees `roots` request. func (s *Server) handleRootsCmd(rs *payload.StateRoots) error { - return nil // TODO + for i := range rs.Roots { + _ = s.chain.AddStateRoot(&rs.Roots[i]) + } + return nil } // handleStateRootCmd processees `stateroot` request. func (s *Server) handleStateRootCmd(r *state.MPTRoot) error { - return nil // TODO + // we ignore error, because there is nothing wrong if we already have this state root + err := s.chain.AddStateRoot(r) + if err == nil && !s.stateCache.Has(r.Hash()) { + s.stateCache.Add(r) + s.broadcastMessage(s.MkMsg(CMDStateRoot, r)) + } + return nil } // handleConsensusCmd processes received consensus payload. @@ -782,11 +799,20 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { return nil } -func (s *Server) handleNewPayload(p *consensus.Payload) { - msg := s.MkMsg(CMDInv, payload.NewInventory(payload.ConsensusType, []util.Uint256{p.Hash()})) - // It's high priority because it directly affects consensus process, - // even though it's just an inv. - s.broadcastHPMessage(msg) +func (s *Server) handleNewPayload(item cache.Hashable) { + switch p := item.(type) { + case *consensus.Payload: + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.ConsensusType, []util.Uint256{p.Hash()})) + // It's high priority because it directly affects consensus process, + // even though it's just an inv. + s.broadcastHPMessage(msg) + case *state.MPTRoot: + s.stateCache.Add(p) + msg := s.MkMsg(CMDStateRoot, p) + s.broadcastMessage(msg) + default: + s.log.Warn("unknown item type", zap.String("type", fmt.Sprintf("%T", p))) + } } func (s *Server) requestTx(hashes ...util.Uint256) {