stateroot: broadcast state on new blocks

This commit is contained in:
Evgeniy Stratonikov 2021-03-03 12:37:06 +03:00
parent 3c65ed1507
commit 2f3abf95a2
9 changed files with 75 additions and 36 deletions

View file

@ -14,7 +14,6 @@ type StateRoot interface {
GetStateProof(root util.Uint256, key []byte) ([][]byte, error) GetStateProof(root util.Uint256, key []byte) ([][]byte, error)
GetStateRoot(height uint32) (*state.MPTRoot, error) GetStateRoot(height uint32) (*state.MPTRoot, error)
GetStateValidators(height uint32) keys.PublicKeys GetStateValidators(height uint32) keys.PublicKeys
SetSignAndSendCallback(func(*state.MPTRoot) error) SetUpdateValidatorsCallback(func(uint32, keys.PublicKeys))
SetUpdateValidatorsCallback(func(keys.PublicKeys))
UpdateStateValidators(height uint32, pubs keys.PublicKeys) UpdateStateValidators(height uint32, pubs keys.PublicKeys)
} }

View file

@ -1,19 +1,11 @@
package stateroot package stateroot
import ( import (
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
) )
// SetSignAndSendCb sets callback for sending signed root.
func (s *Module) SetSignAndSendCallback(f func(*state.MPTRoot) error) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.signAndSendCb = f
}
// SetUpdateValidatorsCallback sets callback for sending signed root. // SetUpdateValidatorsCallback sets callback for sending signed root.
func (s *Module) SetUpdateValidatorsCallback(f func(keys.PublicKeys)) { func (s *Module) SetUpdateValidatorsCallback(f func(uint32, keys.PublicKeys)) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
s.updateValidatorsCb = f s.updateValidatorsCb = f

View file

@ -31,8 +31,7 @@ type (
mtx sync.RWMutex mtx sync.RWMutex
keys []keyCache keys []keyCache
updateValidatorsCb func(publicKeys keys.PublicKeys) updateValidatorsCb func(height uint32, publicKeys keys.PublicKeys)
signAndSendCb func(*state.MPTRoot) error
} }
keyCache struct { keyCache struct {

View file

@ -31,9 +31,6 @@ func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error {
s.validatedHeight.Store(sr.Index) s.validatedHeight.Store(sr.Index)
updateStateHeightMetric(sr.Index) updateStateHeightMetric(sr.Index)
} }
if s.signAndSendCb != nil {
return s.signAndSendCb(sr)
}
return nil return nil
} }

View file

@ -13,7 +13,7 @@ func (s *Module) UpdateStateValidators(height uint32, pubs keys.PublicKeys) {
s.mtx.Lock() s.mtx.Lock()
if s.updateValidatorsCb != nil { if s.updateValidatorsCb != nil {
s.updateValidatorsCb(pubs) s.updateValidatorsCb(height, pubs)
} }
kc := s.getKeyCacheForHeight(height) kc := s.getKeyCacheForHeight(height)
if kc.validatorsHash != h { if kc.validatorsHash != h {

View file

@ -6,6 +6,7 @@ import (
"path" "path"
"sort" "sort"
"testing" "testing"
"time"
"github.com/nspcc-dev/neo-go/internal/testserdes" "github.com/nspcc-dev/neo-go/internal/testserdes"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
@ -23,6 +24,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -80,7 +82,7 @@ func TestStateRoot(t *testing.T) {
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass")
cfg := createStateRootConfig(w.Path(), "pass") cfg := createStateRootConfig(w.Path(), "pass")
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 0, srv.CurrentValidatedHeight()) require.EqualValues(t, 0, srv.CurrentValidatedHeight())
r, err := srv.GetStateRoot(bc.BlockHeight()) r, err := srv.GetStateRoot(bc.BlockHeight())
@ -150,7 +152,7 @@ func TestStateRootInitNonZeroHeight(t *testing.T) {
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass")
cfg := createStateRootConfig(w.Path(), "pass") cfg := createStateRootConfig(w.Path(), "pass")
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc)
require.NoError(t, err) require.NoError(t, err)
r, err := srv.GetStateRoot(2) r, err := srv.GetStateRoot(2)
require.NoError(t, err) require.NoError(t, err)
@ -196,27 +198,33 @@ func TestStateRootFull(t *testing.T) {
h, pubs, accs := newMajorityMultisigWithGAS(t, 2) h, pubs, accs := newMajorityMultisigWithGAS(t, 2)
w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two") w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two")
cfg := createStateRootConfig(w.Path(), "two") cfg := createStateRootConfig(w.Path(), "two")
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc)
require.NoError(t, err) require.NoError(t, err)
srv.Run()
t.Cleanup(srv.Shutdown)
var lastValidated *payload.Extensible var lastValidated atomic.Value
var lastHeight atomic.Uint32
srv.SetRelayCallback(func(ep *payload.Extensible) { srv.SetRelayCallback(func(ep *payload.Extensible) {
lastValidated = ep lastHeight.Store(ep.ValidBlockStart)
lastValidated.Store(ep)
}) })
bc.setNodesByRole(t, true, native.RoleStateValidator, pubs) bc.setNodesByRole(t, true, native.RoleStateValidator, pubs)
transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000)
checkVoteBroadcasted(t, bc, lastValidated, 2, 1) require.Eventually(t, func() bool { return lastHeight.Load() == 2 }, time.Second, time.Millisecond)
checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 2, 1)
_, err = persistBlock(bc) _, err = persistBlock(bc)
checkVoteBroadcasted(t, bc, lastValidated, 3, 1) require.Eventually(t, func() bool { return lastHeight.Load() == 3 }, time.Second, time.Millisecond)
checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 3, 1)
r, err := srv.GetStateRoot(2) r, err := srv.GetStateRoot(2)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHash(r.GetSignedHash()))) require.NoError(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHash(r.GetSignedHash())))
require.NotNil(t, lastValidated) require.NotNil(t, lastValidated.Load().(*payload.Extensible))
msg := new(stateroot.Message) msg := new(stateroot.Message)
require.NoError(t, testserdes.DecodeBinary(lastValidated.Data, msg)) require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg))
require.Equal(t, stateroot.RootT, msg.Type) require.Equal(t, stateroot.RootT, msg.Type)
actual := msg.Payload.(*state.MPTRoot) actual := msg.Payload.(*state.MPTRoot)

View file

@ -177,7 +177,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled")
} }
sr, err := stateroot.New(config.StateRootCfg, s.log, chain.GetStateModule()) sr, err := stateroot.New(config.StateRootCfg, s.log, chain)
if err != nil { if err != nil {
return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) return nil, fmt.Errorf("can't initialize StateRoot service: %w", err)
} }
@ -277,6 +277,9 @@ func (s *Server) Start(errChan chan error) {
s.notaryRequestPool.RunSubscriptions() s.notaryRequestPool.RunSubscriptions()
go s.notaryModule.Run() go s.notaryModule.Run()
} }
if s.StateRootCfg.Enabled {
s.stateRoot.Run()
}
go s.relayBlocksLoop() go s.relayBlocksLoop()
go s.bQueue.run() go s.bQueue.run()
go s.transport.Accept() go s.transport.Accept()
@ -296,6 +299,9 @@ func (s *Server) Shutdown() {
p.Disconnect(errServerShutdown) p.Disconnect(errServerShutdown)
} }
s.bQueue.discard() s.bQueue.discard()
if s.StateRootCfg.Enabled {
s.stateRoot.Shutdown()
}
if s.oracle != nil { if s.oracle != nil {
s.oracle.Shutdown() s.oracle.Shutdown()
} }

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -23,25 +24,31 @@ type (
AddSignature(height uint32, validatorIndex int32, sig []byte) error AddSignature(height uint32, validatorIndex int32, sig []byte) error
GetConfig() config.StateRoot GetConfig() config.StateRoot
SetRelayCallback(RelayCallback) SetRelayCallback(RelayCallback)
Run()
Shutdown()
} }
service struct { service struct {
blockchainer.StateRoot blockchainer.StateRoot
chain blockchainer.Blockchainer
MainCfg config.StateRoot MainCfg config.StateRoot
Network netmode.Magic Network netmode.Magic
log *zap.Logger log *zap.Logger
accMtx sync.RWMutex accMtx sync.RWMutex
myIndex byte accHeight uint32
wallet *wallet.Wallet myIndex byte
acc *wallet.Account wallet *wallet.Wallet
acc *wallet.Account
srMtx sync.Mutex srMtx sync.Mutex
incompleteRoots map[uint32]*incompleteRoot incompleteRoots map[uint32]*incompleteRoot
cbMtx sync.RWMutex cbMtx sync.RWMutex
onValidatedRoot RelayCallback onValidatedRoot RelayCallback
blockCh chan *block.Block
done chan struct{}
} }
) )
@ -51,11 +58,14 @@ const (
) )
// New returns new state root service instance using underlying module. // New returns new state root service instance using underlying module.
func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Service, error) { func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (Service, error) {
s := &service{ s := &service{
StateRoot: mod, StateRoot: bc.GetStateModule(),
chain: bc,
log: log, log: log,
incompleteRoots: make(map[uint32]*incompleteRoot), incompleteRoots: make(map[uint32]*incompleteRoot),
blockCh: make(chan *block.Block),
done: make(chan struct{}),
} }
s.MainCfg = cfg s.MainCfg = cfg
@ -78,7 +88,6 @@ func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Ser
} }
s.SetUpdateValidatorsCallback(s.updateValidators) s.SetUpdateValidatorsCallback(s.updateValidators)
s.SetSignAndSendCallback(s.signAndSend)
} }
return s, nil return s, nil
} }
@ -105,7 +114,7 @@ func (s *service) OnPayload(ep *payload.Extensible) error {
return nil return nil
} }
func (s *service) updateValidators(pubs keys.PublicKeys) { func (s *service) updateValidators(height uint32, pubs keys.PublicKeys) {
s.accMtx.Lock() s.accMtx.Lock()
defer s.accMtx.Unlock() defer s.accMtx.Unlock()
@ -115,6 +124,7 @@ func (s *service) updateValidators(pubs keys.PublicKeys) {
err := acc.Decrypt(s.MainCfg.UnlockWallet.Password) err := acc.Decrypt(s.MainCfg.UnlockWallet.Password)
if err == nil { if err == nil {
s.acc = acc s.acc = acc
s.accHeight = height
s.myIndex = byte(i) s.myIndex = byte(i)
break break
} }

View file

@ -6,8 +6,36 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
) )
// Run runs service instance in a separate goroutine.
func (s *service) Run() {
s.chain.SubscribeForBlocks(s.blockCh)
go s.run()
}
func (s *service) run() {
for {
select {
case b := <-s.blockCh:
r, err := s.GetStateRoot(b.Index)
if err != nil {
s.log.Error("can't get state root for new block", zap.Error(err))
} else if err := s.signAndSend(r); err != nil {
s.log.Error("can't sign or send state root", zap.Error(err))
}
case <-s.done:
return
}
}
}
// Shutdown stops the service.
func (s *service) Shutdown() {
close(s.done)
}
func (s *service) signAndSend(r *state.MPTRoot) error { func (s *service) signAndSend(r *state.MPTRoot) error {
if !s.MainCfg.Enabled { if !s.MainCfg.Enabled {
return nil return nil