network: process state roots properly

This commit is contained in:
Evgenii Stratonikov 2020-06-05 12:11:22 +03:00
parent 519a98039c
commit 1fd7938fd8
5 changed files with 72 additions and 44 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/dbft/payload"
"github.com/nspcc-dev/neo-go/pkg/core"
coreb "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/cache"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -50,9 +51,9 @@ type service struct {
log *zap.Logger
// cache is a fifo cache which stores recent payloads.
cache *relayCache
cache *cache.HashCache
// txx is a fifo cache which stores miner transactions.
txx *relayCache
txx *cache.HashCache
dbft *dbft.DBFT
// messages and transactions are channels needed to process
// everything in single thread.
@ -71,7 +72,7 @@ type Config struct {
Logger *zap.Logger
// Broadcast is a callback which is called to notify server
// about new consensus payload to sent.
Broadcast func(p *Payload)
Broadcast func(cache.Hashable)
// Chain is a core.Blockchainer instance.
Chain core.Blockchainer
// RequestTx is a callback to which will be called
@ -97,8 +98,8 @@ func NewService(cfg Config) (Service, error) {
Config: cfg,
log: cfg.Logger,
cache: newFIFOCache(cacheMaxCapacity),
txx: newFIFOCache(cacheMaxCapacity),
cache: cache.NewFIFOCache(cacheMaxCapacity),
txx: cache.NewFIFOCache(cacheMaxCapacity),
messages: make(chan Payload, 100),
transactions: make(chan *transaction.Transaction, 100),
@ -394,6 +395,7 @@ func (s *service) processBlock(b block.Block) {
if err := s.Chain.AddStateRoot(r); err != nil {
s.log.Warn("errors while adding state root", zap.Error(err))
}
s.Broadcast(r)
}
func (s *service) getBlockWitness(_ *coreb.Block) *transaction.Witness {

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/dbft/payload"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/cache"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -182,7 +183,7 @@ func shouldNotReceive(t *testing.T, ch chan Payload) {
func newTestService(t *testing.T) *service {
srv, err := NewService(Config{
Logger: zaptest.NewLogger(t),
Broadcast: func(*Payload) {},
Broadcast: func(cache.Hashable) {},
Chain: newTestChain(t),
RequestTx: func(...util.Uint256) {},
Wallet: &wallet.Config{

View file

@ -1,4 +1,4 @@
package consensus
package cache
import (
"container/list"
@ -7,9 +7,9 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
)
// relayCache is a payload cache which is used to store
// HashCache is a payload cache which is used to store
// last consensus payloads.
type relayCache struct {
type HashCache struct {
*sync.RWMutex
maxCap int
@ -17,13 +17,14 @@ type relayCache struct {
queue *list.List
}
// hashable is a type of items which can be stored in the relayCache.
type hashable interface {
// Hashable is a type of items which can be stored in the HashCache.
type Hashable interface {
Hash() util.Uint256
}
func newFIFOCache(capacity int) *relayCache {
return &relayCache{
// NewFIFOCache returns new FIFO cache with the specified capacity.
func NewFIFOCache(capacity int) *HashCache {
return &HashCache{
RWMutex: new(sync.RWMutex),
maxCap: capacity,
@ -33,7 +34,7 @@ func newFIFOCache(capacity int) *relayCache {
}
// Add adds payload into a cache if it doesn't already exist.
func (c *relayCache) Add(p hashable) {
func (c *HashCache) Add(p Hashable) {
c.Lock()
defer c.Unlock()
@ -45,7 +46,7 @@ func (c *relayCache) Add(p hashable) {
if c.queue.Len() >= c.maxCap {
first := c.queue.Front()
c.queue.Remove(first)
delete(c.elems, first.Value.(hashable).Hash())
delete(c.elems, first.Value.(Hashable).Hash())
}
e := c.queue.PushBack(p)
@ -53,7 +54,7 @@ func (c *relayCache) Add(p hashable) {
}
// Has checks if an item is already in cache.
func (c *relayCache) Has(h util.Uint256) bool {
func (c *HashCache) Has(h util.Uint256) bool {
c.RLock()
defer c.RUnlock()
@ -61,13 +62,13 @@ func (c *relayCache) Has(h util.Uint256) bool {
}
// Get returns payload with the specified hash from cache.
func (c *relayCache) Get(h util.Uint256) hashable {
func (c *HashCache) Get(h util.Uint256) Hashable {
c.RLock()
defer c.RUnlock()
e, ok := c.elems[h]
if !ok {
return hashable(nil)
return Hashable(nil)
}
return e.Value.(hashable)
return e.Value.(Hashable)
}

View file

@ -1,17 +1,19 @@
package consensus
package cache
import (
"math/rand"
"testing"
"github.com/nspcc-dev/dbft/payload"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/internal/random"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
func TestRelayCache_Add(t *testing.T) {
const capacity = 3
payloads := getDifferentPayloads(t, capacity+1)
c := newFIFOCache(capacity)
payloads := getDifferentItems(t, capacity+1)
c := NewFIFOCache(capacity)
require.Equal(t, 0, c.queue.Len())
require.Equal(t, 0, len(c.elems))
@ -46,19 +48,15 @@ func TestRelayCache_Add(t *testing.T) {
require.Equal(t, nil, c.Get(payloads[1].Hash()))
}
func getDifferentPayloads(t *testing.T, n int) (payloads []Payload) {
payloads = make([]Payload, n)
for i := range payloads {
var sign [signatureSize]byte
random.Fill(sign[:])
type testHashable []byte
payloads[i].message = &message{}
payloads[i].SetValidatorIndex(uint16(i))
payloads[i].SetType(payload.MessageType(commitType))
payloads[i].payload = &commit{
signature: sign,
}
}
// Hash implements Hashable.
func (h testHashable) Hash() util.Uint256 { return hash.Sha256(h) }
return
func getDifferentItems(t *testing.T, n int) []testHashable {
items := make([]testHashable, n)
for i := range items {
items[i] = random.Bytes(rand.Int() % 10)
}
return items
}

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/cache"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
@ -29,6 +30,7 @@ const (
maxBlockBatch = 200
maxAddrsToSend = 200
minPoolCount = 30
stateRootCacheSize = 100
)
var (
@ -67,6 +69,7 @@ type (
transactions chan *transaction.Transaction
stateCache cache.HashCache
consensusStarted *atomic.Bool
log *zap.Logger
@ -99,6 +102,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
unregister: make(chan peerDrop),
peers: make(map[Peer]bool),
consensusStarted: atomic.NewBool(false),
stateCache: *cache.NewFIFOCache(stateRootCacheSize),
log: log,
transactions: make(chan *transaction.Transaction, 64),
}
@ -470,6 +474,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
cp := s.consensus.GetPayload(h)
return cp != nil
},
payload.StateRootType: s.stateCache.Has,
}
if exists := typExists[inv.Type]; exists != nil {
for _, hash := range inv.Hashes {
@ -509,7 +514,10 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
msg = s.MkMsg(CMDBlock, b)
}
case payload.StateRootType:
return nil // do nothing
r := s.stateCache.Get(hash)
if r != nil {
msg = s.MkMsg(CMDStateRoot, r.(*state.MPTRoot))
}
case payload.ConsensusType:
if cp := s.consensus.GetPayload(hash); cp != nil {
msg = s.MkMsg(CMDConsensus, cp)
@ -613,12 +621,21 @@ func (s *Server) handleGetRootsCmd(p Peer, gr *payload.GetStateRoots) error {
// handleStateRootsCmd processees `roots` request.
func (s *Server) handleRootsCmd(rs *payload.StateRoots) error {
return nil // TODO
for i := range rs.Roots {
_ = s.chain.AddStateRoot(&rs.Roots[i])
}
return nil
}
// handleStateRootCmd processees `stateroot` request.
func (s *Server) handleStateRootCmd(r *state.MPTRoot) error {
return nil // TODO
// we ignore error, because there is nothing wrong if we already have this state root
err := s.chain.AddStateRoot(r)
if err == nil && !s.stateCache.Has(r.Hash()) {
s.stateCache.Add(r)
s.broadcastMessage(s.MkMsg(CMDStateRoot, r))
}
return nil
}
// handleConsensusCmd processes received consensus payload.
@ -782,11 +799,20 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
return nil
}
func (s *Server) handleNewPayload(p *consensus.Payload) {
func (s *Server) handleNewPayload(item cache.Hashable) {
switch p := item.(type) {
case *consensus.Payload:
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.ConsensusType, []util.Uint256{p.Hash()}))
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
s.broadcastHPMessage(msg)
case *state.MPTRoot:
s.stateCache.Add(p)
msg := s.MkMsg(CMDStateRoot, p)
s.broadcastMessage(msg)
default:
s.log.Warn("unknown item type", zap.String("type", fmt.Sprintf("%T", p)))
}
}
func (s *Server) requestTx(hashes ...util.Uint256) {