Merge pull request #1953 from nspcc-dev/single-stateroot-sender

Single stateroot sender
This commit is contained in:
Roman Khimov 2021-05-07 14:44:17 +03:00 committed by GitHub
commit bde4e6a91f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 44 deletions

View file

@ -224,6 +224,15 @@ func TestStateRootFull(t *testing.T) {
require.NotNil(t, lastValidated.Load().(*payload.Extensible)) require.NotNil(t, lastValidated.Load().(*payload.Extensible))
msg := new(stateroot.Message) msg := new(stateroot.Message)
require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg))
require.NotEqual(t, stateroot.RootT, msg.Type) // not a sender for this root
r, err = srv.GetStateRoot(3)
require.NoError(t, err)
require.Error(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r)))
require.NoError(t, srv.AddSignature(3, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r)))
require.NotNil(t, lastValidated.Load().(*payload.Extensible))
require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg)) require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg))
require.Equal(t, stateroot.RootT, msg.Type) require.Equal(t, stateroot.RootT, msg.Type)

View file

@ -7,13 +7,15 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap" "go.uber.org/zap"
) )
const rootValidEndInc = 100
// RelayCallback represents callback for sending validated state roots. // RelayCallback represents callback for sending validated state roots.
type RelayCallback = func(*payload.Extensible) type RelayCallback = func(*payload.Extensible)
@ -22,41 +24,32 @@ func (s *service) AddSignature(height uint32, validatorIndex int32, sig []byte)
if !s.MainCfg.Enabled { if !s.MainCfg.Enabled {
return nil return nil
} }
acc := s.getAccount() myIndex, acc := s.getAccount()
if acc == nil { if acc == nil {
return nil return nil
} }
pubs := s.GetStateValidators(height) incRoot := s.getIncompleteRoot(height, myIndex)
if validatorIndex < 0 || int(validatorIndex) >= len(pubs) {
return errors.New("invalid validator index")
}
pub := pubs[validatorIndex]
incRoot := s.getIncompleteRoot(height)
if incRoot == nil { if incRoot == nil {
return nil return nil
} }
incRoot.Lock() incRoot.Lock()
defer incRoot.Unlock()
if validatorIndex < 0 || int(validatorIndex) >= len(incRoot.svList) {
return errors.New("invalid validator index")
}
pub := incRoot.svList[validatorIndex]
if incRoot.root != nil { if incRoot.root != nil {
ok := pub.VerifyHashable(sig, uint32(s.Network), incRoot.root) ok := pub.VerifyHashable(sig, uint32(s.Network), incRoot.root)
if !ok { if !ok {
incRoot.Unlock()
return fmt.Errorf("invalid state root signature for %d", validatorIndex) return fmt.Errorf("invalid state root signature for %d", validatorIndex)
} }
} }
incRoot.addSignature(pub, sig) incRoot.addSignature(pub, sig)
sr, ready := incRoot.finalize(pubs) s.trySendRoot(incRoot, acc)
incRoot.Unlock()
if ready {
err := s.AddStateRoot(sr)
if err != nil {
s.log.Error("can't add validated state root", zap.Error(err))
}
s.sendValidatedRoot(sr, acc.PrivateKey())
}
return nil return nil
} }
@ -65,34 +58,55 @@ func (s *service) GetConfig() config.StateRoot {
return s.MainCfg return s.MainCfg
} }
func (s *service) getIncompleteRoot(height uint32) *incompleteRoot { func (s *service) getIncompleteRoot(height uint32, myIndex byte) *incompleteRoot {
s.srMtx.Lock() s.srMtx.Lock()
defer s.srMtx.Unlock() defer s.srMtx.Unlock()
if incRoot, ok := s.incompleteRoots[height]; ok { if incRoot, ok := s.incompleteRoots[height]; ok {
return incRoot return incRoot
} }
incRoot := &incompleteRoot{sigs: make(map[string]*rootSig)} incRoot := &incompleteRoot{
myIndex: int(myIndex),
svList: s.GetStateValidators(height),
sigs: make(map[string]*rootSig),
}
s.incompleteRoots[height] = incRoot s.incompleteRoots[height] = incRoot
return incRoot return incRoot
} }
func (s *service) sendValidatedRoot(r *state.MPTRoot, priv *keys.PrivateKey) { // trySendRoot attempts to finalize and send MPTRoot, it must be called with ir locked.
func (s *service) trySendRoot(ir *incompleteRoot, acc *wallet.Account) {
if !ir.isSenderNow() {
return
}
sr, ready := ir.finalize()
if ready {
err := s.AddStateRoot(sr)
if err != nil {
s.log.Error("can't add validated state root", zap.Error(err))
}
s.sendValidatedRoot(sr, acc)
ir.isSent = true
}
}
func (s *service) sendValidatedRoot(r *state.MPTRoot, acc *wallet.Account) {
priv := acc.PrivateKey()
w := io.NewBufBinWriter() w := io.NewBufBinWriter()
m := NewMessage(RootT, r) m := NewMessage(RootT, r)
m.EncodeBinary(w.BinWriter) m.EncodeBinary(w.BinWriter)
ep := &payload.Extensible{ ep := &payload.Extensible{
Category: Category, Category: Category,
ValidBlockStart: r.Index, ValidBlockStart: r.Index,
ValidBlockEnd: r.Index + transaction.MaxValidUntilBlockIncrement, ValidBlockEnd: r.Index + rootValidEndInc,
Sender: priv.GetScriptHash(), Sender: priv.GetScriptHash(),
Data: w.Bytes(), Data: w.Bytes(),
Witness: transaction.Witness{ Witness: transaction.Witness{
VerificationScript: s.getAccount().GetVerificationScript(), VerificationScript: acc.GetVerificationScript(),
}, },
} }
sig := priv.SignHashable(uint32(s.Network), ep) sig := priv.SignHashable(uint32(s.Network), ep)
buf := io.NewBufBinWriter() buf := io.NewBufBinWriter()
emit.Bytes(buf.BinWriter, sig) emit.Bytes(buf.BinWriter, sig)
ep.Witness.InvocationScript = buf.Bytes() ep.Witness.InvocationScript = buf.Bytes()
s.onValidatedRoot(ep) s.relayExtensible(ep)
} }

View file

@ -3,6 +3,7 @@ package stateroot
import ( import (
"errors" "errors"
"sync" "sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/config/netmode"
@ -45,7 +46,9 @@ type (
srMtx sync.Mutex srMtx sync.Mutex
incompleteRoots map[uint32]*incompleteRoot incompleteRoots map[uint32]*incompleteRoot
onValidatedRoot RelayCallback timePerBlock time.Duration
maxRetries int
relayExtensible RelayCallback
blockCh chan *block.Block blockCh chan *block.Block
done chan struct{} done chan struct{}
} }
@ -58,15 +61,18 @@ const (
// New returns new state root service instance using underlying module. // New returns new state root service instance using underlying module.
func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) { func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) {
bcConf := bc.GetConfig()
s := &service{ s := &service{
StateRoot: bc.GetStateModule(), StateRoot: bc.GetStateModule(),
Network: bc.GetConfig().Magic, Network: bcConf.Magic,
chain: bc, chain: bc,
log: log, log: log,
incompleteRoots: make(map[uint32]*incompleteRoot), incompleteRoots: make(map[uint32]*incompleteRoot),
blockCh: make(chan *block.Block), blockCh: make(chan *block.Block),
done: make(chan struct{}), done: make(chan struct{}),
onValidatedRoot: cb, timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second,
maxRetries: voteValidEndInc,
relayExtensible: cb,
} }
s.MainCfg = cfg s.MainCfg = cfg
@ -112,6 +118,14 @@ func (s *service) OnPayload(ep *payload.Extensible) error {
s.log.Error("can't add SV-signed state root", zap.Error(err)) s.log.Error("can't add SV-signed state root", zap.Error(err))
return nil return nil
} }
s.srMtx.Lock()
ir, ok := s.incompleteRoots[sr.Index]
s.srMtx.Unlock()
if ok {
ir.Lock()
ir.isSent = true
ir.Unlock()
}
return err return err
case VoteT: case VoteT:
v := m.Payload.(*Vote) v := m.Payload.(*Vote)

View file

@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/emit"
) )
@ -15,12 +16,20 @@ import (
type ( type (
incompleteRoot struct { incompleteRoot struct {
sync.RWMutex sync.RWMutex
// svList is a list of state validator keys for this stateroot.
svList keys.PublicKeys
// isSent is true state root was already broadcasted. // isSent is true state root was already broadcasted.
isSent bool isSent bool
// request is oracle request. // request is oracle request.
root *state.MPTRoot root *state.MPTRoot
// sigs contains signature from every oracle node. // sigs contains signature from every oracle node.
sigs map[string]*rootSig sigs map[string]*rootSig
// myIndex is the index of validator for this root.
myIndex int
// myVote is an extensible message containing node's vote.
myVote *payload.Extensible
// retries is a counter of send attempts.
retries int
} }
rootSig struct { rootSig struct {
@ -49,16 +58,31 @@ func (r *incompleteRoot) addSignature(pub *keys.PublicKey, sig []byte) {
} }
} }
func (r *incompleteRoot) isSenderNow() bool {
if r.root == nil || r.isSent || len(r.svList) == 0 {
return false
}
retries := r.retries
if retries < 0 {
retries = 0
}
ind := (int(r.root.Index) - retries) % len(r.svList)
if ind < 0 {
ind += len(r.svList)
}
return ind == r.myIndex
}
// finalize checks is either main or backup tx has sufficient number of signatures and returns // finalize checks is either main or backup tx has sufficient number of signatures and returns
// tx and bool value indicating if it is ready to be broadcasted. // tx and bool value indicating if it is ready to be broadcasted.
func (r *incompleteRoot) finalize(stateValidators keys.PublicKeys) (*state.MPTRoot, bool) { func (r *incompleteRoot) finalize() (*state.MPTRoot, bool) {
if r.root == nil { if r.root == nil {
return nil, false return nil, false
} }
m := smartcontract.GetDefaultHonestNodeCount(len(stateValidators)) m := smartcontract.GetDefaultHonestNodeCount(len(r.svList))
sigs := make([][]byte, 0, m) sigs := make([][]byte, 0, m)
for _, pub := range stateValidators { for _, pub := range r.svList {
sig, ok := r.sigs[string(pub.Bytes())] sig, ok := r.sigs[string(pub.Bytes())]
if ok && sig.ok { if ok && sig.ok {
sigs = append(sigs, sig.sig) sigs = append(sigs, sig.sig)
@ -71,7 +95,7 @@ func (r *incompleteRoot) finalize(stateValidators keys.PublicKeys) (*state.MPTRo
return nil, false return nil, false
} }
verif, err := smartcontract.CreateDefaultMultiSigRedeemScript(stateValidators) verif, err := smartcontract.CreateDefaultMultiSigRedeemScript(r.svList)
if err != nil { if err != nil {
return nil, false return nil, false
} }

View file

@ -1,6 +1,8 @@
package stateroot package stateroot
import ( import (
"time"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
@ -10,6 +12,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const (
voteValidEndInc = 10
firstVoteResendDelay = 3 * time.Second
)
// Run runs service instance in a separate goroutine. // Run runs service instance in a separate goroutine.
func (s *service) Run() { func (s *service) Run() {
s.log.Info("starting state validation service") s.log.Info("starting state validation service")
@ -28,6 +35,9 @@ runloop:
} else if err := s.signAndSend(r); err != nil { } else if err := s.signAndSend(r); err != nil {
s.log.Error("can't sign or send state root", zap.Error(err)) s.log.Error("can't sign or send state root", zap.Error(err))
} }
s.srMtx.Lock()
delete(s.incompleteRoots, b.Index-voteValidEndInc)
s.srMtx.Unlock()
case <-s.done: case <-s.done:
break runloop break runloop
} }
@ -53,20 +63,20 @@ func (s *service) signAndSend(r *state.MPTRoot) error {
return nil return nil
} }
acc := s.getAccount() myIndex, acc := s.getAccount()
if acc == nil { if acc == nil {
return nil return nil
} }
sig := acc.PrivateKey().SignHashable(uint32(s.Network), r) sig := acc.PrivateKey().SignHashable(uint32(s.Network), r)
incRoot := s.getIncompleteRoot(r.Index) incRoot := s.getIncompleteRoot(r.Index, myIndex)
incRoot.Lock()
defer incRoot.Unlock()
incRoot.root = r incRoot.root = r
incRoot.addSignature(acc.PrivateKey().PublicKey(), sig) incRoot.addSignature(acc.PrivateKey().PublicKey(), sig)
incRoot.reverify(s.Network) incRoot.reverify(s.Network)
s.trySendRoot(incRoot, acc)
s.accMtx.RLock()
myIndex := s.myIndex
s.accMtx.RUnlock()
msg := NewMessage(VoteT, &Vote{ msg := NewMessage(VoteT, &Vote{
ValidatorIndex: int32(myIndex), ValidatorIndex: int32(myIndex),
Height: r.Index, Height: r.Index,
@ -81,23 +91,46 @@ func (s *service) signAndSend(r *state.MPTRoot) error {
e := &payload.Extensible{ e := &payload.Extensible{
Category: Category, Category: Category,
ValidBlockStart: r.Index, ValidBlockStart: r.Index,
ValidBlockEnd: r.Index + transaction.MaxValidUntilBlockIncrement, ValidBlockEnd: r.Index + voteValidEndInc,
Sender: s.getAccount().PrivateKey().GetScriptHash(), Sender: acc.PrivateKey().GetScriptHash(),
Data: w.Bytes(), Data: w.Bytes(),
Witness: transaction.Witness{ Witness: transaction.Witness{
VerificationScript: s.getAccount().GetVerificationScript(), VerificationScript: acc.GetVerificationScript(),
}, },
} }
sig = acc.PrivateKey().SignHashable(uint32(s.Network), e) sig = acc.PrivateKey().SignHashable(uint32(s.Network), e)
buf := io.NewBufBinWriter() buf := io.NewBufBinWriter()
emit.Bytes(buf.BinWriter, sig) emit.Bytes(buf.BinWriter, sig)
e.Witness.InvocationScript = buf.Bytes() e.Witness.InvocationScript = buf.Bytes()
s.onValidatedRoot(e) incRoot.myVote = e
incRoot.retries = -1
s.sendVote(incRoot)
return nil return nil
} }
func (s *service) getAccount() *wallet.Account { // sendVote attempts to send vote if it's still valid and if stateroot message
// was not sent yet. It must be called with ir locked.
func (s *service) sendVote(ir *incompleteRoot) {
if ir.isSent || ir.retries >= s.maxRetries ||
s.chain.HeaderHeight() >= ir.myVote.ValidBlockEnd {
return
}
s.relayExtensible(ir.myVote)
delay := firstVoteResendDelay
if ir.retries > 0 {
delay = s.timePerBlock << ir.retries
}
_ = time.AfterFunc(delay, func() {
ir.Lock()
s.sendVote(ir)
ir.Unlock()
})
ir.retries++
}
// getAccount returns current index and account for the node running this service.
func (s *service) getAccount() (byte, *wallet.Account) {
s.accMtx.RLock() s.accMtx.RLock()
defer s.accMtx.RUnlock() defer s.accMtx.RUnlock()
return s.acc return s.myIndex, s.acc
} }