diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index 8d70ed45e..fafdc0c82 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -224,6 +224,15 @@ func TestStateRootFull(t *testing.T) { require.NotNil(t, lastValidated.Load().(*payload.Extensible)) msg := new(stateroot.Message) + require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg)) + require.NotEqual(t, stateroot.RootT, msg.Type) // not a sender for this root + + r, err = srv.GetStateRoot(3) + require.NoError(t, err) + require.Error(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r))) + require.NoError(t, srv.AddSignature(3, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r))) + require.NotNil(t, lastValidated.Load().(*payload.Extensible)) + require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg)) require.Equal(t, stateroot.RootT, msg.Type) diff --git a/pkg/services/stateroot/network.go b/pkg/services/stateroot/network.go index 9539eab04..a6c914c16 100644 --- a/pkg/services/stateroot/network.go +++ b/pkg/services/stateroot/network.go @@ -7,13 +7,15 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/vm/emit" + "github.com/nspcc-dev/neo-go/pkg/wallet" "go.uber.org/zap" ) +const rootValidEndInc = 100 + // RelayCallback represents callback for sending validated state roots. type RelayCallback = func(*payload.Extensible) @@ -22,41 +24,32 @@ func (s *service) AddSignature(height uint32, validatorIndex int32, sig []byte) if !s.MainCfg.Enabled { return nil } - acc := s.getAccount() + myIndex, acc := s.getAccount() if acc == nil { return nil } - pubs := s.GetStateValidators(height) - if validatorIndex < 0 || int(validatorIndex) >= len(pubs) { - return errors.New("invalid validator index") - } - pub := pubs[validatorIndex] - - incRoot := s.getIncompleteRoot(height) + incRoot := s.getIncompleteRoot(height, myIndex) if incRoot == nil { return nil } incRoot.Lock() + defer incRoot.Unlock() + + if validatorIndex < 0 || int(validatorIndex) >= len(incRoot.svList) { + return errors.New("invalid validator index") + } + + pub := incRoot.svList[validatorIndex] if incRoot.root != nil { ok := pub.VerifyHashable(sig, uint32(s.Network), incRoot.root) if !ok { - incRoot.Unlock() return fmt.Errorf("invalid state root signature for %d", validatorIndex) } } incRoot.addSignature(pub, sig) - sr, ready := incRoot.finalize(pubs) - incRoot.Unlock() - - if ready { - err := s.AddStateRoot(sr) - if err != nil { - s.log.Error("can't add validated state root", zap.Error(err)) - } - s.sendValidatedRoot(sr, acc.PrivateKey()) - } + s.trySendRoot(incRoot, acc) return nil } @@ -65,34 +58,55 @@ func (s *service) GetConfig() config.StateRoot { return s.MainCfg } -func (s *service) getIncompleteRoot(height uint32) *incompleteRoot { +func (s *service) getIncompleteRoot(height uint32, myIndex byte) *incompleteRoot { s.srMtx.Lock() defer s.srMtx.Unlock() if incRoot, ok := s.incompleteRoots[height]; ok { return incRoot } - incRoot := &incompleteRoot{sigs: make(map[string]*rootSig)} + incRoot := &incompleteRoot{ + myIndex: int(myIndex), + svList: s.GetStateValidators(height), + sigs: make(map[string]*rootSig), + } s.incompleteRoots[height] = incRoot return incRoot } -func (s *service) sendValidatedRoot(r *state.MPTRoot, priv *keys.PrivateKey) { +// trySendRoot attempts to finalize and send MPTRoot, it must be called with ir locked. +func (s *service) trySendRoot(ir *incompleteRoot, acc *wallet.Account) { + if !ir.isSenderNow() { + return + } + sr, ready := ir.finalize() + if ready { + err := s.AddStateRoot(sr) + if err != nil { + s.log.Error("can't add validated state root", zap.Error(err)) + } + s.sendValidatedRoot(sr, acc) + ir.isSent = true + } +} + +func (s *service) sendValidatedRoot(r *state.MPTRoot, acc *wallet.Account) { + priv := acc.PrivateKey() w := io.NewBufBinWriter() m := NewMessage(RootT, r) m.EncodeBinary(w.BinWriter) ep := &payload.Extensible{ Category: Category, ValidBlockStart: r.Index, - ValidBlockEnd: r.Index + transaction.MaxValidUntilBlockIncrement, + ValidBlockEnd: r.Index + rootValidEndInc, Sender: priv.GetScriptHash(), Data: w.Bytes(), Witness: transaction.Witness{ - VerificationScript: s.getAccount().GetVerificationScript(), + VerificationScript: acc.GetVerificationScript(), }, } sig := priv.SignHashable(uint32(s.Network), ep) buf := io.NewBufBinWriter() emit.Bytes(buf.BinWriter, sig) ep.Witness.InvocationScript = buf.Bytes() - s.onValidatedRoot(ep) + s.relayExtensible(ep) } diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 694d9fd99..3b01a560e 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -3,6 +3,7 @@ package stateroot import ( "errors" "sync" + "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" @@ -45,7 +46,9 @@ type ( srMtx sync.Mutex incompleteRoots map[uint32]*incompleteRoot - onValidatedRoot RelayCallback + timePerBlock time.Duration + maxRetries int + relayExtensible RelayCallback blockCh chan *block.Block done chan struct{} } @@ -58,15 +61,18 @@ const ( // New returns new state root service instance using underlying module. func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) { + bcConf := bc.GetConfig() s := &service{ StateRoot: bc.GetStateModule(), - Network: bc.GetConfig().Magic, + Network: bcConf.Magic, chain: bc, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), blockCh: make(chan *block.Block), done: make(chan struct{}), - onValidatedRoot: cb, + timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second, + maxRetries: voteValidEndInc, + relayExtensible: cb, } s.MainCfg = cfg @@ -112,6 +118,14 @@ func (s *service) OnPayload(ep *payload.Extensible) error { s.log.Error("can't add SV-signed state root", zap.Error(err)) return nil } + s.srMtx.Lock() + ir, ok := s.incompleteRoots[sr.Index] + s.srMtx.Unlock() + if ok { + ir.Lock() + ir.isSent = true + ir.Unlock() + } return err case VoteT: v := m.Payload.(*Vote) diff --git a/pkg/services/stateroot/signature.go b/pkg/services/stateroot/signature.go index 5262473f0..fb989fb00 100644 --- a/pkg/services/stateroot/signature.go +++ b/pkg/services/stateroot/signature.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/vm/emit" ) @@ -15,12 +16,20 @@ import ( type ( incompleteRoot struct { sync.RWMutex + // svList is a list of state validator keys for this stateroot. + svList keys.PublicKeys // isSent is true state root was already broadcasted. isSent bool // request is oracle request. root *state.MPTRoot // sigs contains signature from every oracle node. sigs map[string]*rootSig + // myIndex is the index of validator for this root. + myIndex int + // myVote is an extensible message containing node's vote. + myVote *payload.Extensible + // retries is a counter of send attempts. + retries int } rootSig struct { @@ -49,16 +58,31 @@ func (r *incompleteRoot) addSignature(pub *keys.PublicKey, sig []byte) { } } +func (r *incompleteRoot) isSenderNow() bool { + if r.root == nil || r.isSent || len(r.svList) == 0 { + return false + } + retries := r.retries + if retries < 0 { + retries = 0 + } + ind := (int(r.root.Index) - retries) % len(r.svList) + if ind < 0 { + ind += len(r.svList) + } + return ind == r.myIndex +} + // finalize checks is either main or backup tx has sufficient number of signatures and returns // tx and bool value indicating if it is ready to be broadcasted. -func (r *incompleteRoot) finalize(stateValidators keys.PublicKeys) (*state.MPTRoot, bool) { +func (r *incompleteRoot) finalize() (*state.MPTRoot, bool) { if r.root == nil { return nil, false } - m := smartcontract.GetDefaultHonestNodeCount(len(stateValidators)) + m := smartcontract.GetDefaultHonestNodeCount(len(r.svList)) sigs := make([][]byte, 0, m) - for _, pub := range stateValidators { + for _, pub := range r.svList { sig, ok := r.sigs[string(pub.Bytes())] if ok && sig.ok { sigs = append(sigs, sig.sig) @@ -71,7 +95,7 @@ func (r *incompleteRoot) finalize(stateValidators keys.PublicKeys) (*state.MPTRo return nil, false } - verif, err := smartcontract.CreateDefaultMultiSigRedeemScript(stateValidators) + verif, err := smartcontract.CreateDefaultMultiSigRedeemScript(r.svList) if err != nil { return nil, false } diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index ed0895ae0..e36ba9bd5 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -1,6 +1,8 @@ package stateroot import ( + "time" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/io" @@ -10,6 +12,11 @@ import ( "go.uber.org/zap" ) +const ( + voteValidEndInc = 10 + firstVoteResendDelay = 3 * time.Second +) + // Run runs service instance in a separate goroutine. func (s *service) Run() { s.log.Info("starting state validation service") @@ -28,6 +35,9 @@ runloop: } else if err := s.signAndSend(r); err != nil { s.log.Error("can't sign or send state root", zap.Error(err)) } + s.srMtx.Lock() + delete(s.incompleteRoots, b.Index-voteValidEndInc) + s.srMtx.Unlock() case <-s.done: break runloop } @@ -53,20 +63,20 @@ func (s *service) signAndSend(r *state.MPTRoot) error { return nil } - acc := s.getAccount() + myIndex, acc := s.getAccount() if acc == nil { return nil } sig := acc.PrivateKey().SignHashable(uint32(s.Network), r) - incRoot := s.getIncompleteRoot(r.Index) + incRoot := s.getIncompleteRoot(r.Index, myIndex) + incRoot.Lock() + defer incRoot.Unlock() incRoot.root = r incRoot.addSignature(acc.PrivateKey().PublicKey(), sig) incRoot.reverify(s.Network) + s.trySendRoot(incRoot, acc) - s.accMtx.RLock() - myIndex := s.myIndex - s.accMtx.RUnlock() msg := NewMessage(VoteT, &Vote{ ValidatorIndex: int32(myIndex), Height: r.Index, @@ -81,23 +91,46 @@ func (s *service) signAndSend(r *state.MPTRoot) error { e := &payload.Extensible{ Category: Category, ValidBlockStart: r.Index, - ValidBlockEnd: r.Index + transaction.MaxValidUntilBlockIncrement, - Sender: s.getAccount().PrivateKey().GetScriptHash(), + ValidBlockEnd: r.Index + voteValidEndInc, + Sender: acc.PrivateKey().GetScriptHash(), Data: w.Bytes(), Witness: transaction.Witness{ - VerificationScript: s.getAccount().GetVerificationScript(), + VerificationScript: acc.GetVerificationScript(), }, } sig = acc.PrivateKey().SignHashable(uint32(s.Network), e) buf := io.NewBufBinWriter() emit.Bytes(buf.BinWriter, sig) e.Witness.InvocationScript = buf.Bytes() - s.onValidatedRoot(e) + incRoot.myVote = e + incRoot.retries = -1 + s.sendVote(incRoot) return nil } -func (s *service) getAccount() *wallet.Account { +// sendVote attempts to send vote if it's still valid and if stateroot message +// was not sent yet. It must be called with ir locked. +func (s *service) sendVote(ir *incompleteRoot) { + if ir.isSent || ir.retries >= s.maxRetries || + s.chain.HeaderHeight() >= ir.myVote.ValidBlockEnd { + return + } + s.relayExtensible(ir.myVote) + delay := firstVoteResendDelay + if ir.retries > 0 { + delay = s.timePerBlock << ir.retries + } + _ = time.AfterFunc(delay, func() { + ir.Lock() + s.sendVote(ir) + ir.Unlock() + }) + ir.retries++ +} + +// getAccount returns current index and account for the node running this service. +func (s *service) getAccount() (byte, *wallet.Account) { s.accMtx.RLock() defer s.accMtx.RUnlock() - return s.acc + return s.myIndex, s.acc }