Merge pull request #3110 from nspcc-dev/fix-tests

Fix race caused by design of native Neo validators/committee cache
This commit is contained in:
Roman Khimov 2023-10-10 14:29:28 +03:00 committed by GitHub
commit 8fcf1eecfa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 142 additions and 74 deletions

View file

@ -48,7 +48,7 @@ type Ledger interface {
GetNextBlockValidators() ([]*keys.PublicKey, error) GetNextBlockValidators() ([]*keys.PublicKey, error)
GetStateRoot(height uint32) (*state.MPTRoot, error) GetStateRoot(height uint32) (*state.MPTRoot, error)
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
GetValidators() ([]*keys.PublicKey, error) ComputeNextBlockValidators() []*keys.PublicKey
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan *coreb.Block) SubscribeForBlocks(ch chan *coreb.Block)
UnsubscribeFromBlocks(ch chan *coreb.Block) UnsubscribeFromBlocks(ch chan *coreb.Block)
@ -677,9 +677,15 @@ func (s *service) getValidators(txes ...block.Transaction) []crypto.PublicKey {
err error err error
) )
if txes == nil { if txes == nil {
// getValidators with empty args is used by dbft to fill the list of
// block's validators, thus should return validators from the current
// epoch without recalculation.
pKeys, err = s.Chain.GetNextBlockValidators() pKeys, err = s.Chain.GetNextBlockValidators()
} else { } else {
pKeys, err = s.Chain.GetValidators() // getValidators with non-empty args is used by dbft to fill block's
// NextConsensus field, ComputeNextBlockValidators will return proper
// value for NextConsensus wrt dBFT epoch start/end.
pKeys = s.Chain.ComputeNextBlockValidators()
} }
if err != nil { if err != nil {
s.log.Error("error while trying to get validators", zap.Error(err)) s.log.Error("error while trying to get validators", zap.Error(err))
@ -721,17 +727,7 @@ func (s *service) newBlockFromContext(ctx *dbft.Context) block.Block {
block.PrevStateRoot = sr.Root block.PrevStateRoot = sr.Root
} }
var validators keys.PublicKeys var validators = s.Chain.ComputeNextBlockValidators()
var err error
cfg := s.Chain.GetConfig().ProtocolConfiguration
if cfg.ShouldUpdateCommitteeAt(ctx.BlockIndex) {
validators, err = s.Chain.GetValidators()
} else {
validators, err = s.Chain.GetNextBlockValidators()
}
if err != nil {
s.log.Fatal(fmt.Sprintf("failed to get validators: %s", err.Error()))
}
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(validators) script, err := smartcontract.CreateDefaultMultiSigRedeemScript(validators)
if err != nil { if err != nil {
s.log.Fatal(fmt.Sprintf("failed to create multisignature script: %s", err.Error())) s.log.Fatal(fmt.Sprintf("failed to create multisignature script: %s", err.Error()))

View file

@ -2697,12 +2697,23 @@ func (bc *Blockchain) GetCommittee() (keys.PublicKeys, error) {
return pubs, nil return pubs, nil
} }
// GetValidators returns current validators. // ComputeNextBlockValidators returns current validators. Validators list
func (bc *Blockchain) GetValidators() ([]*keys.PublicKey, error) { // returned from this method is updated once per CommitteeSize number of blocks.
return bc.contracts.NEO.ComputeNextBlockValidators(bc.blockHeight, bc.dao) // For the last block in the dBFT epoch this method returns the list of validators
// recalculated from the latest relevant information about NEO votes; in this case
// list of validators may differ from the one returned by GetNextBlockValidators.
// For the not-last block of dBFT epoch this method returns the same list as
// GetNextBlockValidators.
func (bc *Blockchain) ComputeNextBlockValidators() []*keys.PublicKey {
return bc.contracts.NEO.ComputeNextBlockValidators(bc.dao)
} }
// GetNextBlockValidators returns next block validators. // GetNextBlockValidators returns next block validators. Validators list returned
// from this method is the sorted top NumOfCNs number of public keys from the
// committee of the current dBFT round (that was calculated once for the
// CommitteeSize number of blocks), thus, validators list returned from this
// method is being updated once per (committee size) number of blocks, but not
// every block.
func (bc *Blockchain) GetNextBlockValidators() ([]*keys.PublicKey, error) { func (bc *Blockchain) GetNextBlockValidators() ([]*keys.PublicKey, error) {
return bc.contracts.NEO.GetNextBlockValidatorsInternal(bc.dao), nil return bc.contracts.NEO.GetNextBlockValidatorsInternal(bc.dao), nil
} }

View file

@ -261,8 +261,7 @@ func TestChainWithVolatileNumOfValidators(t *testing.T) {
priv0 := testchain.PrivateKeyByID(0) priv0 := testchain.PrivateKeyByID(0)
vals, err := bc.GetValidators() vals := bc.ComputeNextBlockValidators()
require.NoError(t, err)
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(vals) script, err := smartcontract.CreateDefaultMultiSigRedeemScript(vals)
require.NoError(t, err) require.NoError(t, err)
curWit := transaction.Witness{ curWit := transaction.Witness{
@ -280,7 +279,7 @@ func TestChainWithVolatileNumOfValidators(t *testing.T) {
} }
// Mimic consensus. // Mimic consensus.
if bc.config.ShouldUpdateCommitteeAt(uint32(i)) { if bc.config.ShouldUpdateCommitteeAt(uint32(i)) {
vals, err = bc.GetValidators() vals = bc.ComputeNextBlockValidators()
} else { } else {
vals, err = bc.GetNextBlockValidators() vals, err = bc.GetNextBlockValidators()
} }

View file

@ -50,14 +50,26 @@ type NeoCache struct {
votesChanged bool votesChanged bool
nextValidators keys.PublicKeys nextValidators keys.PublicKeys
validators keys.PublicKeys // newEpochNextValidators contains cached next block newEpochNextValidators. This list is updated once
// per dBFT epoch in PostPersist of the last block in the epoch if candidates
// votes ratio has been changed or register/unregister operation was performed
// within the last processed epoch. The updated value is being persisted
// following the standard layered DAO persist rules, so that external users
// will get the proper value with upper Blockchain's DAO (but this value is
// relevant only by the moment of first epoch block creation).
newEpochNextValidators keys.PublicKeys
// committee contains cached committee members and their votes. // committee contains cached committee members and their votes.
// It is updated once in a while depending on committee size // It is updated once in a while depending on committee size
// (every 28 blocks for mainnet). It's value // (every 28 blocks for mainnet). It's value
// is always equal to the value stored by `prefixCommittee`. // is always equal to the value stored by `prefixCommittee`.
committee keysWithVotes committee keysWithVotes
// newEpochCommittee contains cached committee members updated once per dBFT
// epoch in PostPersist of the last block in the epoch.
newEpochCommittee keysWithVotes
// committeeHash contains the script hash of the committee. // committeeHash contains the script hash of the committee.
committeeHash util.Uint160 committeeHash util.Uint160
// newEpochCommitteeHash contains the script hash of the newEpochCommittee.
newEpochCommitteeHash util.Uint160
// gasPerVoteCache contains the last updated value of GAS per vote reward for candidates. // gasPerVoteCache contains the last updated value of GAS per vote reward for candidates.
// It is set in state-modifying methods only and read in `PostPersist`, thus is not protected // It is set in state-modifying methods only and read in `PostPersist`, thus is not protected
@ -125,11 +137,13 @@ func (c *NeoCache) Copy() dao.NativeContractCache {
func copyNeoCache(src, dst *NeoCache) { func copyNeoCache(src, dst *NeoCache) {
dst.votesChanged = src.votesChanged dst.votesChanged = src.votesChanged
// Can safely omit copying because the new array is created each time // Can safely omit copying because the new array is created each time
// validators list, nextValidators and committee are updated. // newEpochNextValidators list, nextValidators and committee are updated.
dst.nextValidators = src.nextValidators dst.nextValidators = src.nextValidators
dst.validators = src.validators
dst.committee = src.committee dst.committee = src.committee
dst.committeeHash = src.committeeHash dst.committeeHash = src.committeeHash
dst.newEpochNextValidators = src.newEpochNextValidators
dst.newEpochCommittee = src.newEpochCommittee
dst.newEpochCommitteeHash = src.newEpochCommitteeHash
dst.registerPrice = src.registerPrice dst.registerPrice = src.registerPrice
@ -300,6 +314,11 @@ func (n *NEO) Initialize(ic *interop.Context) error {
setIntWithKey(n.ID, ic.DAO, []byte{prefixRegisterPrice}, DefaultRegisterPrice) setIntWithKey(n.ID, ic.DAO, []byte{prefixRegisterPrice}, DefaultRegisterPrice)
cache.registerPrice = int64(DefaultRegisterPrice) cache.registerPrice = int64(DefaultRegisterPrice)
var numOfCNs = n.cfg.GetNumOfCNs(ic.Block.Index + 1)
err = n.updateCachedNewEpochValues(ic.DAO, cache, ic.BlockHeight(), numOfCNs)
if err != nil {
return fmt.Errorf("failed to update next block newEpoch* cache: %w", err)
}
return nil return nil
} }
@ -325,6 +344,23 @@ func (n *NEO) InitializeCache(blockHeight uint32, d *dao.Simple) error {
cache.gasPerBlock = n.getSortedGASRecordFromDAO(d) cache.gasPerBlock = n.getSortedGASRecordFromDAO(d)
cache.registerPrice = getIntWithKey(n.ID, d, []byte{prefixRegisterPrice}) cache.registerPrice = getIntWithKey(n.ID, d, []byte{prefixRegisterPrice})
// Update newEpoch* cache for external users. It holds values for the previous
// dBFT epoch if the current one isn't yet finished.
if n.cfg.ShouldUpdateCommitteeAt(blockHeight + 1) {
var numOfCNs = n.cfg.GetNumOfCNs(blockHeight + 1)
err := n.updateCachedNewEpochValues(d, cache, blockHeight, numOfCNs)
if err != nil {
return fmt.Errorf("failed to update next block newEpoch* cache: %w", err)
}
} else {
// nextValidators, committee and committee hash are filled in by this moment
// via n.updateCache call.
cache.newEpochNextValidators = cache.nextValidators.Copy()
cache.newEpochCommittee = make(keysWithVotes, len(cache.committee))
copy(cache.newEpochCommittee, cache.committee)
cache.newEpochCommitteeHash = cache.committeeHash
}
d.SetCache(n.ID, cache) d.SetCache(n.ID, cache)
return nil return nil
} }
@ -340,7 +376,7 @@ func (n *NEO) initConfigCache(cfg config.ProtocolConfiguration) error {
func (n *NEO) updateCache(cache *NeoCache, cvs keysWithVotes, blockHeight uint32) error { func (n *NEO) updateCache(cache *NeoCache, cvs keysWithVotes, blockHeight uint32) error {
cache.committee = cvs cache.committee = cvs
var committee = getCommitteeMembers(cache) var committee = getCommitteeMembers(cache.committee)
script, err := smartcontract.CreateMajorityMultiSigRedeemScript(committee.Copy()) script, err := smartcontract.CreateMajorityMultiSigRedeemScript(committee.Copy())
if err != nil { if err != nil {
return err return err
@ -353,22 +389,28 @@ func (n *NEO) updateCache(cache *NeoCache, cvs keysWithVotes, blockHeight uint32
return nil return nil
} }
func (n *NEO) updateCommittee(cache *NeoCache, ic *interop.Context) error { // updateCachedNewEpochValues sets newEpochNextValidators, newEpochCommittee and
if !cache.votesChanged { // newEpochCommitteeHash cache that will be used by external users to retrieve
// We need to put in storage anyway, as it affects dumps // next block validators list of the next dBFT epoch that wasn't yet started and
ic.DAO.PutStorageItem(n.ID, prefixCommittee, cache.committee.Bytes(ic.DAO.GetItemCtx())) // will be used by corresponding values initialisation on the next epoch start.
return nil // The updated new epoch cached values computed using the persisted blocks state
// of the latest epoch.
func (n *NEO) updateCachedNewEpochValues(d *dao.Simple, cache *NeoCache, blockHeight uint32, numOfCNs int) error {
committee, cvs, err := n.computeCommitteeMembers(blockHeight, d)
if err != nil {
return fmt.Errorf("failed to compute committee members: %w", err)
} }
cache.newEpochCommittee = cvs
_, cvs, err := n.computeCommitteeMembers(ic.BlockHeight(), ic.DAO) script, err := smartcontract.CreateMajorityMultiSigRedeemScript(committee.Copy())
if err != nil { if err != nil {
return err return err
} }
if err := n.updateCache(cache, cvs, ic.BlockHeight()); err != nil { cache.newEpochCommitteeHash = hash.Hash160(script)
return err
} nextVals := committee[:numOfCNs].Copy()
cache.votesChanged = false sort.Sort(nextVals)
ic.DAO.PutStorageItem(n.ID, prefixCommittee, cvs.Bytes(ic.DAO.GetItemCtx())) cache.newEpochNextValidators = nextVals
return nil return nil
} }
@ -376,15 +418,15 @@ func (n *NEO) updateCommittee(cache *NeoCache, ic *interop.Context) error {
func (n *NEO) OnPersist(ic *interop.Context) error { func (n *NEO) OnPersist(ic *interop.Context) error {
if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) { if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) {
cache := ic.DAO.GetRWCache(n.ID).(*NeoCache) cache := ic.DAO.GetRWCache(n.ID).(*NeoCache)
oldKeys := cache.nextValidators // Cached newEpoch* values always have proper value set (either by PostPersist
oldCom := cache.committee // during the last epoch block handling or by initialization code).
if n.cfg.GetNumOfCNs(ic.Block.Index) != len(oldKeys) || cache.nextValidators = cache.newEpochNextValidators
n.cfg.GetCommitteeSize(ic.Block.Index) != len(oldCom) { cache.committee = cache.newEpochCommittee
cache.votesChanged = true cache.committeeHash = cache.newEpochCommitteeHash
} cache.votesChanged = false
if err := n.updateCommittee(cache, ic); err != nil {
return err // We need to put in storage anyway, as it affects dumps
} ic.DAO.PutStorageItem(n.ID, prefixCommittee, cache.committee.Bytes(ic.DAO.GetItemCtx()))
} }
return nil return nil
} }
@ -393,12 +435,13 @@ func (n *NEO) OnPersist(ic *interop.Context) error {
func (n *NEO) PostPersist(ic *interop.Context) error { func (n *NEO) PostPersist(ic *interop.Context) error {
gas := n.GetGASPerBlock(ic.DAO, ic.Block.Index) gas := n.GetGASPerBlock(ic.DAO, ic.Block.Index)
cache := ic.DAO.GetROCache(n.ID).(*NeoCache) cache := ic.DAO.GetROCache(n.ID).(*NeoCache)
pubs := getCommitteeMembers(cache) pubs := getCommitteeMembers(cache.committee)
committeeSize := n.cfg.GetCommitteeSize(ic.Block.Index) committeeSize := n.cfg.GetCommitteeSize(ic.Block.Index)
index := int(ic.Block.Index) % committeeSize index := int(ic.Block.Index) % committeeSize
committeeReward := new(big.Int).Mul(gas, bigCommitteeRewardRatio) committeeReward := new(big.Int).Mul(gas, bigCommitteeRewardRatio)
n.GAS.mint(ic, pubs[index].GetScriptHash(), committeeReward.Div(committeeReward, big100), false) n.GAS.mint(ic, pubs[index].GetScriptHash(), committeeReward.Div(committeeReward, big100), false)
var isCacheRW bool
if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) { if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) {
var voterReward = new(big.Int).Set(bigVoterRewardRatio) var voterReward = new(big.Int).Set(bigVoterRewardRatio)
voterReward.Mul(voterReward, gas) voterReward.Mul(voterReward, gas)
@ -409,7 +452,6 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
var ( var (
cs = cache.committee cs = cache.committee
isCacheRW bool
key = make([]byte, 34) key = make([]byte, 34)
) )
for i := range cs { for i := range cs {
@ -437,6 +479,27 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
} }
} }
} }
// Update newEpoch cache for external users and further committee, committeeHash
// and nextBlockValidators cache initialisation if committee should be updated in
// the next block.
if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index + 1) {
var (
h = ic.Block.Index // consider persisting block as stored to get _next_ block newEpochNextValidators
numOfCNs = n.cfg.GetNumOfCNs(h + 1)
)
if cache.votesChanged ||
numOfCNs != len(cache.newEpochNextValidators) ||
n.cfg.GetCommitteeSize(h+1) != len(cache.newEpochCommittee) {
if !isCacheRW {
cache = ic.DAO.GetRWCache(n.ID).(*NeoCache)
}
err := n.updateCachedNewEpochValues(ic.DAO, cache, h, numOfCNs)
if err != nil {
return fmt.Errorf("failed to update next block newEpoch* cache: %w", err)
}
}
}
return nil return nil
} }
@ -775,7 +838,9 @@ func (n *NEO) UnregisterCandidateInternal(ic *interop.Context, pub *keys.PublicK
return nil return nil
} }
cache := ic.DAO.GetRWCache(n.ID).(*NeoCache) cache := ic.DAO.GetRWCache(n.ID).(*NeoCache)
cache.validators = nil // Not only current committee/validators cache is interested in votesChanged, but also
// newEpoch cache, thus, modify votesChanged to update the latter.
cache.votesChanged = true
c := new(candidate).FromBytes(si) c := new(candidate).FromBytes(si)
emitEvent := c.Registered emitEvent := c.Registered
c.Registered = false c.Registered = false
@ -901,7 +966,7 @@ func (n *NEO) ModifyAccountVotes(acc *state.NEOBalance, d *dao.Simple, value *bi
return nil return nil
} }
} }
cache.validators = nil cache.newEpochNextValidators = nil
return putConvertibleToDAO(n.ID, d, key, cd) return putConvertibleToDAO(n.ID, d, key, cd)
} }
return nil return nil
@ -1042,24 +1107,24 @@ func (n *NEO) getAccountState(ic *interop.Context, args []stackitem.Item) stacki
return item return item
} }
// ComputeNextBlockValidators returns an actual list of current validators. // ComputeNextBlockValidators computes an actual list of current validators that is
func (n *NEO) ComputeNextBlockValidators(blockHeight uint32, d *dao.Simple) (keys.PublicKeys, error) { // relevant for the latest processed dBFT epoch and based on the changes made by
numOfCNs := n.cfg.GetNumOfCNs(blockHeight + 1) // register/unregister/vote calls during the latest epoch.
// Most of the time it should be OK with RO cache, thus try to retrieve // Note: this method isn't actually "computes" new committee list and calculates
// validators without RW cache creation to avoid cached values copying. // new validators list from it. Instead, it uses cache, and the cache itself is
// updated during the PostPersist of the last block of every epoch.
func (n *NEO) ComputeNextBlockValidators(d *dao.Simple) keys.PublicKeys {
// It should always be OK with RO cache if using lower-layered DAO with proper
// cache set.
cache := d.GetROCache(n.ID).(*NeoCache) cache := d.GetROCache(n.ID).(*NeoCache)
if vals := cache.validators; vals != nil && numOfCNs == len(vals) { if vals := cache.newEpochNextValidators; vals != nil {
return vals.Copy(), nil return vals.Copy()
} }
cache = d.GetRWCache(n.ID).(*NeoCache) // It's a caller's program error to call ComputeNextBlockValidators not having
result, _, err := n.computeCommitteeMembers(blockHeight, d) // the right value in lower cache. With the current scheme of handling
if err != nil { // newEpochNextValidators cache this code is expected to be unreachable, but
return nil, err // let's have this panic here just in case.
} panic("bug: unexpected external call to newEpochNextValidators cache")
result = result[:numOfCNs]
sort.Sort(result)
cache.validators = result
return result, nil
} }
func (n *NEO) getCommittee(ic *interop.Context, _ []stackitem.Item) stackitem.Item { func (n *NEO) getCommittee(ic *interop.Context, _ []stackitem.Item) stackitem.Item {
@ -1083,11 +1148,10 @@ func (n *NEO) modifyVoterTurnout(d *dao.Simple, amount *big.Int) error {
// GetCommitteeMembers returns public keys of nodes in committee using cached value. // GetCommitteeMembers returns public keys of nodes in committee using cached value.
func (n *NEO) GetCommitteeMembers(d *dao.Simple) keys.PublicKeys { func (n *NEO) GetCommitteeMembers(d *dao.Simple) keys.PublicKeys {
cache := d.GetROCache(n.ID).(*NeoCache) cache := d.GetROCache(n.ID).(*NeoCache)
return getCommitteeMembers(cache) return getCommitteeMembers(cache.committee)
} }
func getCommitteeMembers(cache *NeoCache) keys.PublicKeys { func getCommitteeMembers(cvs keysWithVotes) keys.PublicKeys {
var cvs = cache.committee
var committee = make(keys.PublicKeys, len(cvs)) var committee = make(keys.PublicKeys, len(cvs))
var err error var err error
for i := range committee { for i := range committee {

View file

@ -138,8 +138,7 @@ func TestNEO_Vote(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
standBySorted = standBySorted[:validatorsCount] standBySorted = standBySorted[:validatorsCount]
sort.Sort(standBySorted) sort.Sort(standBySorted)
pubs, err := e.Chain.GetValidators() pubs := e.Chain.ComputeNextBlockValidators()
require.NoError(t, err)
require.Equal(t, standBySorted, keys.PublicKeys(pubs)) require.Equal(t, standBySorted, keys.PublicKeys(pubs))
// voters vote for candidates. The aim of this test is to check if voting // voters vote for candidates. The aim of this test is to check if voting
@ -176,7 +175,7 @@ func TestNEO_Vote(t *testing.T) {
} }
// We still haven't voted enough validators in. // We still haven't voted enough validators in.
pubs, err = e.Chain.GetValidators() pubs = e.Chain.ComputeNextBlockValidators()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, standBySorted, keys.PublicKeys(pubs)) require.Equal(t, standBySorted, keys.PublicKeys(pubs))
@ -267,8 +266,7 @@ func TestNEO_Vote(t *testing.T) {
advanceChain(t) advanceChain(t)
pubs, err = e.Chain.GetValidators() pubs = e.Chain.ComputeNextBlockValidators()
require.NoError(t, err)
for i := range pubs { for i := range pubs {
require.NotEqual(t, candidates[0], pubs[i]) require.NotEqual(t, candidates[0], pubs[i])
require.NotEqual(t, candidates[len(candidates)-1], pubs[i]) require.NotEqual(t, candidates[len(candidates)-1], pubs[i])