diff --git a/pkg/core/blockchainer/state_root.go b/pkg/core/blockchainer/state_root.go index 556427f84..979e15963 100644 --- a/pkg/core/blockchainer/state_root.go +++ b/pkg/core/blockchainer/state_root.go @@ -14,7 +14,6 @@ type StateRoot interface { GetStateProof(root util.Uint256, key []byte) ([][]byte, error) GetStateRoot(height uint32) (*state.MPTRoot, error) GetStateValidators(height uint32) keys.PublicKeys - SetSignAndSendCallback(func(*state.MPTRoot) error) - SetUpdateValidatorsCallback(func(keys.PublicKeys)) + SetUpdateValidatorsCallback(func(uint32, keys.PublicKeys)) UpdateStateValidators(height uint32, pubs keys.PublicKeys) } diff --git a/pkg/core/stateroot/callbacks.go b/pkg/core/stateroot/callbacks.go index 19ea0a17e..95fce212a 100644 --- a/pkg/core/stateroot/callbacks.go +++ b/pkg/core/stateroot/callbacks.go @@ -1,19 +1,11 @@ package stateroot import ( - "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" ) -// SetSignAndSendCb sets callback for sending signed root. -func (s *Module) SetSignAndSendCallback(f func(*state.MPTRoot) error) { - s.mtx.Lock() - defer s.mtx.Unlock() - s.signAndSendCb = f -} - // SetUpdateValidatorsCallback sets callback for sending signed root. -func (s *Module) SetUpdateValidatorsCallback(f func(keys.PublicKeys)) { +func (s *Module) SetUpdateValidatorsCallback(f func(uint32, keys.PublicKeys)) { s.mtx.Lock() defer s.mtx.Unlock() s.updateValidatorsCb = f diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 9b7c3bf57..2d63b89b5 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -31,8 +31,7 @@ type ( mtx sync.RWMutex keys []keyCache - updateValidatorsCb func(publicKeys keys.PublicKeys) - signAndSendCb func(*state.MPTRoot) error + updateValidatorsCb func(height uint32, publicKeys keys.PublicKeys) } keyCache struct { diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index 9efbd4686..b2ab7d210 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -31,9 +31,6 @@ func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error { s.validatedHeight.Store(sr.Index) updateStateHeightMetric(sr.Index) } - if s.signAndSendCb != nil { - return s.signAndSendCb(sr) - } return nil } diff --git a/pkg/core/stateroot/validators.go b/pkg/core/stateroot/validators.go index e07a447e2..545c52ded 100644 --- a/pkg/core/stateroot/validators.go +++ b/pkg/core/stateroot/validators.go @@ -13,7 +13,7 @@ func (s *Module) UpdateStateValidators(height uint32, pubs keys.PublicKeys) { s.mtx.Lock() if s.updateValidatorsCb != nil { - s.updateValidatorsCb(pubs) + s.updateValidatorsCb(height, pubs) } kc := s.getKeyCacheForHeight(height) if kc.validatorsHash != h { diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index 814e48498..9b9d2e0b6 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -6,6 +6,7 @@ import ( "path" "sort" "testing" + "time" "github.com/nspcc-dev/neo-go/internal/testserdes" "github.com/nspcc-dev/neo-go/pkg/config" @@ -23,6 +24,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap/zaptest" ) @@ -80,7 +82,7 @@ func TestStateRoot(t *testing.T) { defer os.RemoveAll(tmpDir) w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) require.NoError(t, err) require.EqualValues(t, 0, srv.CurrentValidatedHeight()) r, err := srv.GetStateRoot(bc.BlockHeight()) @@ -150,7 +152,7 @@ func TestStateRootInitNonZeroHeight(t *testing.T) { defer os.RemoveAll(tmpDir) w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) require.NoError(t, err) r, err := srv.GetStateRoot(2) require.NoError(t, err) @@ -196,27 +198,33 @@ func TestStateRootFull(t *testing.T) { h, pubs, accs := newMajorityMultisigWithGAS(t, 2) w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two") cfg := createStateRootConfig(w.Path(), "two") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) require.NoError(t, err) + srv.Run() + t.Cleanup(srv.Shutdown) - var lastValidated *payload.Extensible + var lastValidated atomic.Value + var lastHeight atomic.Uint32 srv.SetRelayCallback(func(ep *payload.Extensible) { - lastValidated = ep + lastHeight.Store(ep.ValidBlockStart) + lastValidated.Store(ep) }) bc.setNodesByRole(t, true, native.RoleStateValidator, pubs) transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) - checkVoteBroadcasted(t, bc, lastValidated, 2, 1) + require.Eventually(t, func() bool { return lastHeight.Load() == 2 }, time.Second, time.Millisecond) + checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 2, 1) _, err = persistBlock(bc) - checkVoteBroadcasted(t, bc, lastValidated, 3, 1) + require.Eventually(t, func() bool { return lastHeight.Load() == 3 }, time.Second, time.Millisecond) + checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 3, 1) r, err := srv.GetStateRoot(2) require.NoError(t, err) require.NoError(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHash(r.GetSignedHash()))) - require.NotNil(t, lastValidated) + require.NotNil(t, lastValidated.Load().(*payload.Extensible)) msg := new(stateroot.Message) - require.NoError(t, testserdes.DecodeBinary(lastValidated.Data, msg)) + require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg)) require.Equal(t, stateroot.RootT, msg.Type) actual := msg.Payload.(*state.MPTRoot) diff --git a/pkg/network/server.go b/pkg/network/server.go index e0e2dea65..39bfb2cd8 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -177,7 +177,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") } - sr, err := stateroot.New(config.StateRootCfg, s.log, chain.GetStateModule()) + sr, err := stateroot.New(config.StateRootCfg, s.log, chain) if err != nil { return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) } @@ -277,6 +277,9 @@ func (s *Server) Start(errChan chan error) { s.notaryRequestPool.RunSubscriptions() go s.notaryModule.Run() } + if s.StateRootCfg.Enabled { + s.stateRoot.Run() + } go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() @@ -296,6 +299,9 @@ func (s *Server) Shutdown() { p.Disconnect(errServerShutdown) } s.bQueue.discard() + if s.StateRootCfg.Enabled { + s.stateRoot.Shutdown() + } if s.oracle != nil { s.oracle.Shutdown() } diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 76726f5ff..0c5b17f40 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -23,25 +24,31 @@ type ( AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot SetRelayCallback(RelayCallback) + Run() + Shutdown() } service struct { blockchainer.StateRoot + chain blockchainer.Blockchainer MainCfg config.StateRoot Network netmode.Magic - log *zap.Logger - accMtx sync.RWMutex - myIndex byte - wallet *wallet.Wallet - acc *wallet.Account + log *zap.Logger + accMtx sync.RWMutex + accHeight uint32 + myIndex byte + wallet *wallet.Wallet + acc *wallet.Account srMtx sync.Mutex incompleteRoots map[uint32]*incompleteRoot cbMtx sync.RWMutex onValidatedRoot RelayCallback + blockCh chan *block.Block + done chan struct{} } ) @@ -51,11 +58,14 @@ const ( ) // New returns new state root service instance using underlying module. -func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Service, error) { +func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (Service, error) { s := &service{ - StateRoot: mod, + StateRoot: bc.GetStateModule(), + chain: bc, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), + blockCh: make(chan *block.Block), + done: make(chan struct{}), } s.MainCfg = cfg @@ -78,7 +88,6 @@ func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Ser } s.SetUpdateValidatorsCallback(s.updateValidators) - s.SetSignAndSendCallback(s.signAndSend) } return s, nil } @@ -105,7 +114,7 @@ func (s *service) OnPayload(ep *payload.Extensible) error { return nil } -func (s *service) updateValidators(pubs keys.PublicKeys) { +func (s *service) updateValidators(height uint32, pubs keys.PublicKeys) { s.accMtx.Lock() defer s.accMtx.Unlock() @@ -115,6 +124,7 @@ func (s *service) updateValidators(pubs keys.PublicKeys) { err := acc.Decrypt(s.MainCfg.UnlockWallet.Password) if err == nil { s.acc = acc + s.accHeight = height s.myIndex = byte(i) break } diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 4413c4676..b3a98378e 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -6,8 +6,36 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/zap" ) +// Run runs service instance in a separate goroutine. +func (s *service) Run() { + s.chain.SubscribeForBlocks(s.blockCh) + go s.run() +} + +func (s *service) run() { + for { + select { + case b := <-s.blockCh: + r, err := s.GetStateRoot(b.Index) + if err != nil { + s.log.Error("can't get state root for new block", zap.Error(err)) + } else if err := s.signAndSend(r); err != nil { + s.log.Error("can't sign or send state root", zap.Error(err)) + } + case <-s.done: + return + } + } +} + +// Shutdown stops the service. +func (s *service) Shutdown() { + close(s.done) +} + func (s *service) signAndSend(r *state.MPTRoot) error { if !s.MainCfg.Enabled { return nil