stateroot: implement exponential vote resends
This commit is contained in:
parent
d5c7a40db9
commit
7c902669bf
3 changed files with 40 additions and 3 deletions
|
@ -3,6 +3,7 @@ package stateroot
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config"
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
||||||
|
@ -45,6 +46,8 @@ type (
|
||||||
srMtx sync.Mutex
|
srMtx sync.Mutex
|
||||||
incompleteRoots map[uint32]*incompleteRoot
|
incompleteRoots map[uint32]*incompleteRoot
|
||||||
|
|
||||||
|
timePerBlock time.Duration
|
||||||
|
maxRetries int
|
||||||
relayExtensible RelayCallback
|
relayExtensible RelayCallback
|
||||||
blockCh chan *block.Block
|
blockCh chan *block.Block
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
@ -58,14 +61,17 @@ const (
|
||||||
|
|
||||||
// New returns new state root service instance using underlying module.
|
// New returns new state root service instance using underlying module.
|
||||||
func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) {
|
func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) {
|
||||||
|
bcConf := bc.GetConfig()
|
||||||
s := &service{
|
s := &service{
|
||||||
StateRoot: bc.GetStateModule(),
|
StateRoot: bc.GetStateModule(),
|
||||||
Network: bc.GetConfig().Magic,
|
Network: bcConf.Magic,
|
||||||
chain: bc,
|
chain: bc,
|
||||||
log: log,
|
log: log,
|
||||||
incompleteRoots: make(map[uint32]*incompleteRoot),
|
incompleteRoots: make(map[uint32]*incompleteRoot),
|
||||||
blockCh: make(chan *block.Block),
|
blockCh: make(chan *block.Block),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second,
|
||||||
|
maxRetries: bcConf.ValidatorsCount + 1,
|
||||||
relayExtensible: cb,
|
relayExtensible: cb,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
||||||
)
|
)
|
||||||
|
@ -23,6 +24,10 @@ type (
|
||||||
root *state.MPTRoot
|
root *state.MPTRoot
|
||||||
// sigs contains signature from every oracle node.
|
// sigs contains signature from every oracle node.
|
||||||
sigs map[string]*rootSig
|
sigs map[string]*rootSig
|
||||||
|
// myVote is an extensible message containing node's vote.
|
||||||
|
myVote *payload.Extensible
|
||||||
|
// retries is a counter of send attempts.
|
||||||
|
retries int
|
||||||
}
|
}
|
||||||
|
|
||||||
rootSig struct {
|
rootSig struct {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package stateroot
|
package stateroot
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
@ -10,6 +12,8 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const firstVoteResendDelay = 3 * time.Second
|
||||||
|
|
||||||
// Run runs service instance in a separate goroutine.
|
// Run runs service instance in a separate goroutine.
|
||||||
func (s *service) Run() {
|
func (s *service) Run() {
|
||||||
s.log.Info("starting state validation service")
|
s.log.Info("starting state validation service")
|
||||||
|
@ -61,11 +65,11 @@ func (s *service) signAndSend(r *state.MPTRoot) error {
|
||||||
sig := acc.PrivateKey().SignHashable(uint32(s.Network), r)
|
sig := acc.PrivateKey().SignHashable(uint32(s.Network), r)
|
||||||
incRoot := s.getIncompleteRoot(r.Index)
|
incRoot := s.getIncompleteRoot(r.Index)
|
||||||
incRoot.Lock()
|
incRoot.Lock()
|
||||||
|
defer incRoot.Unlock()
|
||||||
incRoot.root = r
|
incRoot.root = r
|
||||||
incRoot.addSignature(acc.PrivateKey().PublicKey(), sig)
|
incRoot.addSignature(acc.PrivateKey().PublicKey(), sig)
|
||||||
incRoot.reverify(s.Network)
|
incRoot.reverify(s.Network)
|
||||||
s.trySendRoot(incRoot, acc)
|
s.trySendRoot(incRoot, acc)
|
||||||
incRoot.Unlock()
|
|
||||||
|
|
||||||
msg := NewMessage(VoteT, &Vote{
|
msg := NewMessage(VoteT, &Vote{
|
||||||
ValidatorIndex: int32(myIndex),
|
ValidatorIndex: int32(myIndex),
|
||||||
|
@ -92,10 +96,32 @@ func (s *service) signAndSend(r *state.MPTRoot) error {
|
||||||
buf := io.NewBufBinWriter()
|
buf := io.NewBufBinWriter()
|
||||||
emit.Bytes(buf.BinWriter, sig)
|
emit.Bytes(buf.BinWriter, sig)
|
||||||
e.Witness.InvocationScript = buf.Bytes()
|
e.Witness.InvocationScript = buf.Bytes()
|
||||||
s.relayExtensible(e)
|
incRoot.myVote = e
|
||||||
|
incRoot.retries = -1
|
||||||
|
s.sendVote(incRoot)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendVote attempts to send vote if it's still valid and if stateroot message
|
||||||
|
// was not sent yet. It must be called with ir locked.
|
||||||
|
func (s *service) sendVote(ir *incompleteRoot) {
|
||||||
|
if ir.isSent || ir.retries >= s.maxRetries ||
|
||||||
|
s.chain.HeaderHeight() >= ir.myVote.ValidBlockEnd {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.relayExtensible(ir.myVote)
|
||||||
|
delay := firstVoteResendDelay
|
||||||
|
if ir.retries > 0 {
|
||||||
|
delay = s.timePerBlock << ir.retries
|
||||||
|
}
|
||||||
|
_ = time.AfterFunc(delay, func() {
|
||||||
|
ir.Lock()
|
||||||
|
s.sendVote(ir)
|
||||||
|
ir.Unlock()
|
||||||
|
})
|
||||||
|
ir.retries++
|
||||||
|
}
|
||||||
|
|
||||||
// getAccount returns current index and account for the node running this service.
|
// getAccount returns current index and account for the node running this service.
|
||||||
func (s *service) getAccount() (byte, *wallet.Account) {
|
func (s *service) getAccount() (byte, *wallet.Account) {
|
||||||
s.accMtx.RLock()
|
s.accMtx.RLock()
|
||||||
|
|
Loading…
Reference in a new issue