From 7b632c8ee8b4cb297635faebb5c36b64584dddeb Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 15 Apr 2022 17:48:58 +0300 Subject: [PATCH] core: refactor natives cache 1. Use layered natives cache. With layered cache the storeblock process includes the following steps: create a wrapper over current nativeCache, put changes into upper nativeCache layer, persist (or discard) changes. 2. Split contract getters to read-only and read-and-change. Read-only ones doesn't require the copy of an existing nativeCache item. Read-and-change ones create a copy and after that change the copy. --- pkg/core/native/designate.go | 54 +++++++++++- pkg/core/native/management.go | 54 ++++++++++-- pkg/core/native/native_neo.go | 79 +++++++++++++++--- pkg/core/native/notary.go | 41 +++++++-- pkg/core/native/oracle.go | 36 +++++++- pkg/core/native/policy.go | 54 +++++++++--- pkg/core/storage/memcached_store.go | 124 ++++++++++++++++++++++------ 7 files changed, 374 insertions(+), 68 deletions(-) diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index 785bd2f9f..93411c46f 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -75,6 +75,52 @@ var ( ErrNoBlock = errors.New("no persisting block in the context") ) +var ( + _ interop.Contract = (*Designate)(nil) + _ storage.NativeContractCache = (*DesignationCache)(nil) +) + +// Copy implements NativeContractCache interface. +func (c *DesignationCache) Copy() storage.NativeContractCache { + cp := &DesignationCache{} + copyDesignationCache(c, cp) + return cp +} + +// Persist implements NativeContractCache interface. +func (c *DesignationCache) Persist(ps storage.NativeContractCache) (storage.NativeContractCache, error) { + if ps == nil { + ps = &DesignationCache{} + } + psCache, ok := ps.(*DesignationCache) + if !ok { + return nil, errors.New("not a Designation native cache") + } + copyDesignationCache(c, psCache) + return psCache, nil +} + +func copyDesignationCache(src, dst *DesignationCache) { + dst.rolesChangedFlag.Store(src.rolesChangedFlag.Load()) + for _, r := range []noderoles.Role{noderoles.StateValidator, noderoles.Oracle, noderoles.NeoFSAlphabet, noderoles.P2PNotary} { + data := getCachedRoleData(src, r) + if data != nil { + var v = &roleData{} + *v = *data + switch r { + case noderoles.StateValidator: + dst.stateVals.Store(v) + case noderoles.Oracle: + dst.oracles.Store(v) + case noderoles.NeoFSAlphabet: + dst.neofsAlphabet.Store(v) + case noderoles.P2PNotary: + dst.notaries.Store(v) + } + } + } +} + func (s *Designate) isValidRole(r noderoles.Role) bool { return r == noderoles.Oracle || r == noderoles.StateValidator || r == noderoles.NeoFSAlphabet || (s.p2pSigExtensionsEnabled && r == noderoles.P2PNotary) @@ -119,7 +165,7 @@ func (s *Designate) OnPersist(ic *interop.Context) error { // PostPersist implements Contract interface. func (s *Designate) PostPersist(ic *interop.Context) error { - cache := ic.DAO.Store.GetCache(s.ID).(*DesignationCache) + cache := ic.DAO.Store.GetRWCache(s.ID).(*DesignationCache) if !rolesChanged(cache) { return nil } @@ -237,7 +283,7 @@ func (s *Designate) GetLastDesignatedHash(d *dao.Simple, r noderoles.Role) (util if !s.isValidRole(r) { return util.Uint160{}, ErrInvalidRole } - cache := d.Store.GetCache(s.ID).(*DesignationCache) + cache := d.Store.GetROCache(s.ID).(*DesignationCache) if !rolesChanged(cache) { if val := getCachedRoleData(cache, r); val != nil { return val.addr, nil @@ -256,7 +302,7 @@ func (s *Designate) GetDesignatedByRole(d *dao.Simple, r noderoles.Role, index u if !s.isValidRole(r) { return nil, 0, ErrInvalidRole } - cache := d.Store.GetCache(s.ID).(*DesignationCache) + cache := d.Store.GetROCache(s.ID).(*DesignationCache) if !rolesChanged(cache) { if val := getCachedRoleData(cache, r); val != nil && val.height <= index { return val.nodes.Copy(), val.height, nil @@ -335,7 +381,7 @@ func (s *Designate) DesignateAsRole(ic *interop.Context, r noderoles.Role, pubs } sort.Sort(pubs) nl := NodeList(pubs) - ic.DAO.Store.GetCache(s.ID).(*DesignationCache).rolesChangedFlag.Store(true) + ic.DAO.Store.GetRWCache(s.ID).(*DesignationCache).rolesChangedFlag.Store(true) err := putConvertibleToDAO(s.ID, ic.DAO, key, &nl) if err != nil { return err diff --git a/pkg/core/native/management.go b/pkg/core/native/management.go index 9fcd8a6f1..765bb7ed4 100644 --- a/pkg/core/native/management.go +++ b/pkg/core/native/management.go @@ -59,6 +59,48 @@ var ( keyMinimumDeploymentFee = []byte{20} ) +var ( + _ interop.Contract = (*Management)(nil) + _ storage.NativeContractCache = (*ManagementCache)(nil) +) + +// Copy implements NativeContractCache interface. +func (c *ManagementCache) Copy() storage.NativeContractCache { + cp := &ManagementCache{ + contracts: make(map[util.Uint160]*state.Contract), + nep11: make(map[util.Uint160]struct{}), + nep17: make(map[util.Uint160]struct{}), + } + // Copy the whole set of contracts is too expensive. We will create a separate map + // holding the same set of pointers to contracts, and in case if some contract is + // supposed to be changed, Management will create the copy in-place. + for hash, ctr := range c.contracts { + cp.contracts[hash] = ctr + } + for hash := range c.nep17 { + cp.nep17[hash] = struct{}{} + } + for hash := range c.nep11 { + cp.nep11[hash] = struct{}{} + } + return cp +} + +// Persist implements NativeContractCache interface. +func (c *ManagementCache) Persist(ps storage.NativeContractCache) (storage.NativeContractCache, error) { + if ps == nil { + ps = &ManagementCache{} + } + psCache, ok := ps.(*ManagementCache) + if !ok { + return nil, errors.New("not a Management native cache") + } + psCache.contracts = c.contracts + psCache.nep17 = c.nep17 + psCache.nep11 = c.nep11 + return psCache, nil +} + // MakeContractKey creates a key from account script hash. func MakeContractKey(h util.Uint160) []byte { return makeUint160Key(prefixContract, h) @@ -145,7 +187,7 @@ func (m *Management) getContract(ic *interop.Context, args []stackitem.Item) sta // GetContract returns contract with given hash from given DAO. func (m *Management) GetContract(d *dao.Simple, hash util.Uint160) (*state.Contract, error) { - cache := d.Store.GetCache(m.ID).(*ManagementCache) + cache := d.Store.GetROCache(m.ID).(*ManagementCache) cache.mtx.RLock() cs, ok := cache.contracts[hash] cache.mtx.RUnlock() @@ -261,7 +303,7 @@ func (m *Management) deployWithData(ic *interop.Context, args []stackitem.Item) } func (m *Management) markUpdated(d *dao.Simple, h util.Uint160) { - cache := d.Store.GetCache(m.ID).(*ManagementCache) + cache := d.Store.GetRWCache(m.ID).(*ManagementCache) cache.mtx.Lock() // Just set it to nil, to refresh cache in `PostPersist`. cache.contracts[h] = nil @@ -476,7 +518,7 @@ func (m *Management) OnPersist(ic *interop.Context) error { return err } if cache == nil { - cache = ic.DAO.Store.GetCache(m.ID).(*ManagementCache) + cache = ic.DAO.Store.GetRWCache(m.ID).(*ManagementCache) } cache.mtx.Lock() updateContractCache(cache, cs) @@ -515,7 +557,7 @@ func (m *Management) InitializeCache(d *dao.Simple) error { // PostPersist implements Contract interface. func (m *Management) PostPersist(ic *interop.Context) error { - cache := ic.DAO.Store.GetCache(m.ID).(*ManagementCache) + cache := ic.DAO.Store.GetRWCache(m.ID).(*ManagementCache) cache.mtx.Lock() defer cache.mtx.Unlock() for h, cs := range cache.contracts { @@ -539,7 +581,7 @@ func (m *Management) PostPersist(ic *interop.Context) error { // is updated every PostPersist, so until PostPersist is called, the result for the previous block // is returned. func (m *Management) GetNEP11Contracts(d *dao.Simple) []util.Uint160 { - cache := d.Store.GetCache(m.ID).(*ManagementCache) + cache := d.Store.GetROCache(m.ID).(*ManagementCache) cache.mtx.RLock() result := make([]util.Uint160, 0, len(cache.nep11)) for h := range cache.nep11 { @@ -553,7 +595,7 @@ func (m *Management) GetNEP11Contracts(d *dao.Simple) []util.Uint160 { // is updated every PostPersist, so until PostPersist is called, the result for the previous block // is returned. func (m *Management) GetNEP17Contracts(d *dao.Simple) []util.Uint160 { - cache := d.Store.GetCache(m.ID).(*ManagementCache) + cache := d.Store.GetROCache(m.ID).(*ManagementCache) cache.mtx.RLock() result := make([]util.Uint160, 0, len(cache.nep17)) for h := range cache.nep17 { diff --git a/pkg/core/native/native_neo.go b/pkg/core/native/native_neo.go index 681e77867..bf2574121 100644 --- a/pkg/core/native/native_neo.go +++ b/pkg/core/native/native_neo.go @@ -108,6 +108,59 @@ var ( big100 = big.NewInt(100) ) +var ( + _ interop.Contract = (*NEO)(nil) + _ storage.NativeContractCache = (*NeoCache)(nil) +) + +// Copy implements NativeContractCache interface. +func (c *NeoCache) Copy() storage.NativeContractCache { + cp := &NeoCache{} + copyNeoCache(c, cp) + return cp +} + +// Persist implements NativeContractCache interface. +func (c *NeoCache) Persist(ps storage.NativeContractCache) (storage.NativeContractCache, error) { + if ps == nil { + ps = &NeoCache{} + } + psCache, ok := ps.(*NeoCache) + if !ok { + return nil, errors.New("not a NEO native cache") + } + copyNeoCache(c, psCache) + return psCache, nil +} + +func copyNeoCache(src, dst *NeoCache) { + dst.votesChanged.Store(src.votesChanged.Load()) + dst.nextValidators.Store(src.nextValidators.Load().(keys.PublicKeys).Copy()) + dst.validators.Store(src.validators.Load().(keys.PublicKeys).Copy()) + committeeSrc := src.committee.Load().(keysWithVotes) + committeeDst := make(keysWithVotes, len(committeeSrc)) + copy(committeeDst, committeeSrc) + dst.committee.Store(committeeDst) + dst.committeeHash.Store(src.committeeHash.Load()) + regPriceChanged := src.registerPriceChanged.Load().(bool) + dst.registerPriceChanged.Store(regPriceChanged) + if !regPriceChanged { + dst.registerPrice.Store(src.registerPrice.Load()) + } + gasPerBlockChanged := src.gasPerBlockChanged.Load().(bool) + dst.gasPerBlockChanged.Store(gasPerBlockChanged) + if !gasPerBlockChanged { + recordsSrc := src.gasPerBlock.Load().(gasRecord) + recordsDst := make(gasRecord, len(recordsSrc)) + copy(recordsDst, recordsSrc) + dst.gasPerBlock.Store(recordsDst) + } + dst.gasPerVoteCache = make(map[string]big.Int) + for k, v := range src.gasPerVoteCache { + dst.gasPerVoteCache[k] = v + } +} + // makeValidatorKey creates a key from account script hash. func makeValidatorKey(key *keys.PublicKey) []byte { b := key.Bytes() @@ -332,7 +385,7 @@ func (n *NEO) updateCommittee(cache *NeoCache, ic *interop.Context) error { // OnPersist implements Contract interface. func (n *NEO) OnPersist(ic *interop.Context) error { if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) { - cache := ic.DAO.Store.GetCache(n.ID).(*NeoCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NeoCache) oldKeys := cache.nextValidators.Load().(keys.PublicKeys) oldCom := cache.committee.Load().(keysWithVotes) if n.cfg.GetNumOfCNs(ic.Block.Index) != len(oldKeys) || @@ -349,7 +402,7 @@ func (n *NEO) OnPersist(ic *interop.Context) error { // PostPersist implements Contract interface. func (n *NEO) PostPersist(ic *interop.Context) error { gas := n.GetGASPerBlock(ic.DAO, ic.Block.Index) - cache := ic.DAO.Store.GetCache(n.ID).(*NeoCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NeoCache) pubs := getCommitteeMembers(cache) committeeSize := n.cfg.GetCommitteeSize(ic.Block.Index) index := int(ic.Block.Index) % committeeSize @@ -526,7 +579,7 @@ func (n *NEO) getSortedGASRecordFromDAO(d *dao.Simple) gasRecord { // GetGASPerBlock returns gas generated for block with provided index. func (n *NEO) GetGASPerBlock(d *dao.Simple, index uint32) *big.Int { - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetROCache(n.ID).(*NeoCache) var gr gasRecord if cache.gasPerBlockChanged.Load().(bool) { gr = n.getSortedGASRecordFromDAO(d) @@ -544,7 +597,7 @@ func (n *NEO) GetGASPerBlock(d *dao.Simple, index uint32) *big.Int { // GetCommitteeAddress returns address of the committee. func (n *NEO) GetCommitteeAddress(d *dao.Simple) util.Uint160 { - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetROCache(n.ID).(*NeoCache) return cache.committeeHash.Load().(util.Uint160) } @@ -573,7 +626,7 @@ func (n *NEO) SetGASPerBlock(ic *interop.Context, index uint32, gas *big.Int) er if !n.checkCommittee(ic) { return errors.New("invalid committee signature") } - cache := ic.DAO.Store.GetCache(n.ID).(*NeoCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NeoCache) cache.gasPerBlockChanged.Store(true) n.putGASRecord(ic.DAO, index, gas) return nil @@ -584,7 +637,7 @@ func (n *NEO) getRegisterPrice(ic *interop.Context, _ []stackitem.Item) stackite } func (n *NEO) getRegisterPriceInternal(d *dao.Simple) int64 { - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetROCache(n.ID).(*NeoCache) if !cache.registerPriceChanged.Load().(bool) { return cache.registerPrice.Load().(int64) } @@ -601,7 +654,7 @@ func (n *NEO) setRegisterPrice(ic *interop.Context, args []stackitem.Item) stack } setIntWithKey(n.ID, ic.DAO, []byte{prefixRegisterPrice}, price.Int64()) - cache := ic.DAO.Store.GetCache(n.ID).(*NeoCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NeoCache) cache.registerPriceChanged.Store(true) return stackitem.Null{} } @@ -672,7 +725,7 @@ func (n *NEO) CalculateNEOHolderReward(d *dao.Simple, value *big.Int, start, end return nil, errors.New("negative value") } var gr gasRecord - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetROCache(n.ID).(*NeoCache) if !cache.gasPerBlockChanged.Load().(bool) { gr = cache.gasPerBlock.Load().(gasRecord) } else { @@ -747,7 +800,7 @@ func (n *NEO) UnregisterCandidateInternal(ic *interop.Context, pub *keys.PublicK if si == nil { return nil } - cache := ic.DAO.Store.GetCache(n.ID).(*NeoCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NeoCache) cache.validators.Store(keys.PublicKeys(nil)) c := new(candidate).FromBytes(si) c.Registered = false @@ -827,7 +880,7 @@ func (n *NEO) VoteInternal(ic *interop.Context, h util.Uint160, pub *keys.Public // ModifyAccountVotes modifies votes of the specified account by value (can be negative). // typ specifies if this modify is occurring during transfer or vote (with old or new validator). func (n *NEO) ModifyAccountVotes(acc *state.NEOBalance, d *dao.Simple, value *big.Int, isNewVote bool) error { - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetRWCache(n.ID).(*NeoCache) cache.votesChanged.Store(true) if acc.VoteTo != nil { key := makeValidatorKey(acc.VoteTo) @@ -938,7 +991,7 @@ func (n *NEO) getAccountState(ic *interop.Context, args []stackitem.Item) stacki // ComputeNextBlockValidators returns an actual list of current validators. func (n *NEO) ComputeNextBlockValidators(bc interop.Ledger, d *dao.Simple) (keys.PublicKeys, error) { numOfCNs := n.cfg.GetNumOfCNs(bc.BlockHeight() + 1) - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetRWCache(n.ID).(*NeoCache) if vals := cache.validators.Load().(keys.PublicKeys); vals != nil && numOfCNs == len(vals) { return vals.Copy(), nil } @@ -973,7 +1026,7 @@ func (n *NEO) modifyVoterTurnout(d *dao.Simple, amount *big.Int) error { // GetCommitteeMembers returns public keys of nodes in committee using cached value. func (n *NEO) GetCommitteeMembers(d *dao.Simple) keys.PublicKeys { - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetROCache(n.ID).(*NeoCache) return getCommitteeMembers(cache) } @@ -1053,7 +1106,7 @@ func (n *NEO) getNextBlockValidators(ic *interop.Context, _ []stackitem.Item) st // GetNextBlockValidatorsInternal returns next block validators. func (n *NEO) GetNextBlockValidatorsInternal(d *dao.Simple) keys.PublicKeys { - cache := d.Store.GetCache(n.ID).(*NeoCache) + cache := d.Store.GetROCache(n.ID).(*NeoCache) return cache.nextValidators.Load().(keys.PublicKeys).Copy() } diff --git a/pkg/core/native/notary.go b/pkg/core/native/notary.go index 18abcc31b..817a5e73a 100644 --- a/pkg/core/native/notary.go +++ b/pkg/core/native/notary.go @@ -58,6 +58,37 @@ var ( notaryServiceFeeKey = []byte{5} ) +var ( + _ interop.Contract = (*Notary)(nil) + _ storage.NativeContractCache = (*NotaryCache)(nil) +) + +// Copy implements NativeContractCache interface. +func (c *NotaryCache) Copy() storage.NativeContractCache { + cp := &NotaryCache{} + copyNotaryCache(c, cp) + return cp +} + +// Persist implements NativeContractCache interface. +func (c *NotaryCache) Persist(ps storage.NativeContractCache) (storage.NativeContractCache, error) { + if ps == nil { + ps = &NotaryCache{} + } + psCache, ok := ps.(*NotaryCache) + if !ok { + return nil, errors.New("not a Notary native cache") + } + copyNotaryCache(c, psCache) + return psCache, nil +} + +func copyNotaryCache(src, dst *NotaryCache) { + dst.isValid = src.isValid + dst.maxNotValidBeforeDelta = src.maxNotValidBeforeDelta + dst.notaryServiceFeePerKey = src.notaryServiceFeePerKey +} + // newNotary returns Notary native contract. func newNotary() *Notary { n := &Notary{ContractMD: *interop.NewContractMD(nativenames.Notary, notaryContractID)} @@ -192,7 +223,7 @@ func (n *Notary) OnPersist(ic *interop.Context) error { // PostPersist implements Contract interface. func (n *Notary) PostPersist(ic *interop.Context) error { - cache := ic.DAO.Store.GetCache(n.ID).(*NotaryCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NotaryCache) cache.lock.Lock() defer cache.lock.Unlock() if cache.isValid { @@ -408,7 +439,7 @@ func (n *Notary) getMaxNotValidBeforeDelta(ic *interop.Context, _ []stackitem.It // GetMaxNotValidBeforeDelta is an internal representation of Notary getMaxNotValidBeforeDelta method. func (n *Notary) GetMaxNotValidBeforeDelta(dao *dao.Simple) uint32 { - cache := dao.Store.GetCache(n.ID).(*NotaryCache) + cache := dao.Store.GetROCache(n.ID).(*NotaryCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -428,7 +459,7 @@ func (n *Notary) setMaxNotValidBeforeDelta(ic *interop.Context, args []stackitem if !n.NEO.checkCommittee(ic) { panic("invalid committee signature") } - cache := ic.DAO.Store.GetCache(n.ID).(*NotaryCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NotaryCache) cache.lock.Lock() defer cache.lock.Unlock() setIntWithKey(n.ID, ic.DAO, maxNotValidBeforeDeltaKey, int64(value)) @@ -443,7 +474,7 @@ func (n *Notary) getNotaryServiceFeePerKey(ic *interop.Context, _ []stackitem.It // GetNotaryServiceFeePerKey is an internal representation of Notary getNotaryServiceFeePerKey method. func (n *Notary) GetNotaryServiceFeePerKey(dao *dao.Simple) int64 { - cache := dao.Store.GetCache(n.ID).(*NotaryCache) + cache := dao.Store.GetROCache(n.ID).(*NotaryCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -461,7 +492,7 @@ func (n *Notary) setNotaryServiceFeePerKey(ic *interop.Context, args []stackitem if !n.NEO.checkCommittee(ic) { panic("invalid committee signature") } - cache := ic.DAO.Store.GetCache(n.ID).(*NotaryCache) + cache := ic.DAO.Store.GetRWCache(n.ID).(*NotaryCache) cache.lock.Lock() defer cache.lock.Unlock() setIntWithKey(n.ID, ic.DAO, notaryServiceFeeKey, int64(value)) diff --git a/pkg/core/native/oracle.go b/pkg/core/native/oracle.go index 664728ca9..893b5cebf 100644 --- a/pkg/core/native/oracle.go +++ b/pkg/core/native/oracle.go @@ -84,6 +84,36 @@ var ( ErrResponseNotFound = errors.New("oracle response not found") ) +var ( + _ interop.Contract = (*Oracle)(nil) + _ storage.NativeContractCache = (*OracleCache)(nil) +) + +// Copy implements NativeContractCache interface. +func (c *OracleCache) Copy() storage.NativeContractCache { + cp := &OracleCache{} + copyOracleCache(c, cp) + return cp +} + +// Persist implements NativeContractCache interface. +func (c *OracleCache) Persist(ps storage.NativeContractCache) (storage.NativeContractCache, error) { + if ps == nil { + ps = &OracleCache{} + } + psCache, ok := ps.(*OracleCache) + if !ok { + return nil, errors.New("not an Oracle native cache") + } + copyOracleCache(c, psCache) + return psCache, nil +} + +func copyOracleCache(src, dst *OracleCache) { + dst.requestPrice.Store(src.requestPrice.Load()) + dst.requestPriceChanged.Store(src.requestPriceChanged.Load()) +} + func newOracle() *Oracle { o := &Oracle{ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID)} defer o.UpdateHash() @@ -143,7 +173,7 @@ func (o *Oracle) OnPersist(ic *interop.Context) error { // PostPersist represents `postPersist` method. func (o *Oracle) PostPersist(ic *interop.Context) error { p := o.getPriceInternal(ic.DAO) - cache := ic.DAO.Store.GetCache(o.ID).(*OracleCache) + cache := ic.DAO.Store.GetRWCache(o.ID).(*OracleCache) if cache.requestPriceChanged.Load().(bool) { cache.requestPrice.Store(p) cache.requestPriceChanged.Store(false) @@ -450,7 +480,7 @@ func (o *Oracle) getPrice(ic *interop.Context, _ []stackitem.Item) stackitem.Ite } func (o *Oracle) getPriceInternal(d *dao.Simple) int64 { - cache := d.Store.GetCache(o.ID).(*OracleCache) + cache := d.Store.GetROCache(o.ID).(*OracleCache) if !cache.requestPriceChanged.Load().(bool) { return cache.requestPrice.Load().(int64) } @@ -466,7 +496,7 @@ func (o *Oracle) setPrice(ic *interop.Context, args []stackitem.Item) stackitem. panic("invalid committee signature") } setIntWithKey(o.ID, ic.DAO, prefixRequestPrice, price.Int64()) - cache := ic.DAO.Store.GetCache(o.ID).(*OracleCache) + cache := ic.DAO.Store.GetRWCache(o.ID).(*OracleCache) cache.requestPriceChanged.Store(true) return stackitem.Null{} } diff --git a/pkg/core/native/policy.go b/pkg/core/native/policy.go index 9623ead77..d4446e8b5 100644 --- a/pkg/core/native/policy.go +++ b/pkg/core/native/policy.go @@ -1,6 +1,7 @@ package native import ( + "errors" "fmt" "math/big" "sort" @@ -68,7 +69,36 @@ type PolicyCache struct { blockedAccounts []util.Uint160 } -var _ interop.Contract = (*Policy)(nil) +var ( + _ interop.Contract = (*Policy)(nil) + _ storage.NativeContractCache = (*PolicyCache)(nil) +) + +// Copy implements NativeContractCache interface. +func (c *PolicyCache) Copy() storage.NativeContractCache { + cp := &PolicyCache{} + copyPolicyCache(c, cp) + return cp +} + +// Persist implements NativeContractCache interface. +func (c *PolicyCache) Persist(ps storage.NativeContractCache) (storage.NativeContractCache, error) { + if ps == nil { + ps = &PolicyCache{} + } + psCache, ok := ps.(*PolicyCache) + if !ok { + return nil, errors.New("not a Policy native cache") + } + copyPolicyCache(c, psCache) + return psCache, nil +} + +func copyPolicyCache(src, dst *PolicyCache) { + *dst = *src + dst.blockedAccounts = make([]util.Uint160, len(src.blockedAccounts)) + copy(dst.blockedAccounts, src.blockedAccounts) +} // newPolicy returns Policy native contract. func newPolicy() *Policy { @@ -184,7 +214,7 @@ func (p *Policy) OnPersist(ic *interop.Context) error { // PostPersist implements Contract interface. func (p *Policy) PostPersist(ic *interop.Context) error { - cache := ic.DAO.Store.GetCache(p.ID).(*PolicyCache) + cache := ic.DAO.Store.GetRWCache(p.ID).(*PolicyCache) cache.lock.Lock() defer cache.lock.Unlock() if cache.isValid { @@ -202,7 +232,7 @@ func (p *Policy) getFeePerByte(ic *interop.Context, _ []stackitem.Item) stackite // GetFeePerByteInternal returns required transaction's fee per byte. func (p *Policy) GetFeePerByteInternal(dao *dao.Simple) int64 { - cache := dao.Store.GetCache(p.ID).(*PolicyCache) + cache := dao.Store.GetROCache(p.ID).(*PolicyCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -213,7 +243,7 @@ func (p *Policy) GetFeePerByteInternal(dao *dao.Simple) int64 { // GetMaxVerificationGas returns maximum gas allowed to be burned during verificaion. func (p *Policy) GetMaxVerificationGas(dao *dao.Simple) int64 { - cache := dao.Store.GetCache(p.ID).(*PolicyCache) + cache := dao.Store.GetROCache(p.ID).(*PolicyCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -228,7 +258,7 @@ func (p *Policy) getExecFeeFactor(ic *interop.Context, _ []stackitem.Item) stack // GetExecFeeFactorInternal returns current execution fee factor. func (p *Policy) GetExecFeeFactorInternal(d *dao.Simple) int64 { - cache := d.Store.GetCache(p.ID).(*PolicyCache) + cache := d.Store.GetROCache(p.ID).(*PolicyCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -245,7 +275,7 @@ func (p *Policy) setExecFeeFactor(ic *interop.Context, args []stackitem.Item) st if !p.NEO.checkCommittee(ic) { panic("invalid committee signature") } - cache := ic.DAO.Store.GetCache(p.ID).(*PolicyCache) + cache := ic.DAO.Store.GetRWCache(p.ID).(*PolicyCache) cache.lock.Lock() defer cache.lock.Unlock() setIntWithKey(p.ID, ic.DAO, execFeeFactorKey, int64(value)) @@ -261,7 +291,7 @@ func (p *Policy) isBlocked(ic *interop.Context, args []stackitem.Item) stackitem // IsBlockedInternal checks whether provided account is blocked. func (p *Policy) IsBlockedInternal(dao *dao.Simple, hash util.Uint160) bool { - cache := dao.Store.GetCache(p.ID).(*PolicyCache) + cache := dao.Store.GetROCache(p.ID).(*PolicyCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -284,7 +314,7 @@ func (p *Policy) getStoragePrice(ic *interop.Context, _ []stackitem.Item) stacki // GetStoragePriceInternal returns current execution fee factor. func (p *Policy) GetStoragePriceInternal(d *dao.Simple) int64 { - cache := d.Store.GetCache(p.ID).(*PolicyCache) + cache := d.Store.GetROCache(p.ID).(*PolicyCache) cache.lock.RLock() defer cache.lock.RUnlock() if cache.isValid { @@ -301,7 +331,7 @@ func (p *Policy) setStoragePrice(ic *interop.Context, args []stackitem.Item) sta if !p.NEO.checkCommittee(ic) { panic("invalid committee signature") } - cache := ic.DAO.Store.GetCache(p.ID).(*PolicyCache) + cache := ic.DAO.Store.GetRWCache(p.ID).(*PolicyCache) cache.lock.Lock() defer cache.lock.Unlock() setIntWithKey(p.ID, ic.DAO, storagePriceKey, int64(value)) @@ -318,7 +348,7 @@ func (p *Policy) setFeePerByte(ic *interop.Context, args []stackitem.Item) stack if !p.NEO.checkCommittee(ic) { panic("invalid committee signature") } - cache := ic.DAO.Store.GetCache(p.ID).(*PolicyCache) + cache := ic.DAO.Store.GetRWCache(p.ID).(*PolicyCache) cache.lock.Lock() defer cache.lock.Unlock() setIntWithKey(p.ID, ic.DAO, feePerByteKey, value) @@ -342,7 +372,7 @@ func (p *Policy) blockAccount(ic *interop.Context, args []stackitem.Item) stacki return stackitem.NewBool(false) } key := append([]byte{blockedAccountPrefix}, hash.BytesBE()...) - cache := ic.DAO.Store.GetCache(p.ID).(*PolicyCache) + cache := ic.DAO.Store.GetRWCache(p.ID).(*PolicyCache) cache.lock.Lock() defer cache.lock.Unlock() ic.DAO.PutStorageItem(p.ID, key, state.StorageItem{}) @@ -361,7 +391,7 @@ func (p *Policy) unblockAccount(ic *interop.Context, args []stackitem.Item) stac return stackitem.NewBool(false) } key := append([]byte{blockedAccountPrefix}, hash.BytesBE()...) - cache := ic.DAO.Store.GetCache(p.ID).(*PolicyCache) + cache := ic.DAO.Store.GetRWCache(p.ID).(*PolicyCache) cache.lock.Lock() defer cache.lock.Unlock() ic.DAO.DeleteStorageItem(p.ID, key) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index e241a3b39..fcb50a346 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "context" + "fmt" "sort" "strings" "sync" @@ -16,7 +17,7 @@ type MemCachedStore struct { MemoryStore nativeCacheLock sync.RWMutex - nativeCache map[int32]*NativeCacheItem + nativeCache map[int32]NativeContractCache private bool // plock protects Persist from double entrance. @@ -25,8 +26,16 @@ type MemCachedStore struct { ps Store } -type NativeCacheItem struct { - Value interface{} +// NativeContractCache is an interface representing cache for a native contract. +// Cache can be copied to create a wrapper around current DAO layer. Wrapped cache +// can be persisted to the underlying DAO native cache. +type NativeContractCache interface { + // Copy returns a copy of native cache item that can safely be changed within + // the subsequent DAO operations. + Copy() NativeContractCache + // Persist persists changes from upper native cache wrapper to the underlying + // native cache `ps`. The resulting up-to-date cache and an error are returned. + Persist(ps NativeContractCache) (NativeContractCache, error) } type ( @@ -53,12 +62,9 @@ type ( // NewMemCachedStore creates a new MemCachedStore object. func NewMemCachedStore(lower Store) *MemCachedStore { - var cache map[int32]*NativeCacheItem - if cached, ok := lower.(*MemCachedStore); ok { - cache = cached.nativeCache - } else { - cache = make(map[int32]*NativeCacheItem) - } + // Do not copy cache from ps; instead should create clear map: GetRWCache and + // GetROCache will retrieve cache from the underlying nativeCache if requested. + cache := make(map[int32]NativeContractCache) return &MemCachedStore{ MemoryStore: *NewMemoryStore(), nativeCache: cache, @@ -69,12 +75,11 @@ func NewMemCachedStore(lower Store) *MemCachedStore { // NewPrivateMemCachedStore creates a new private (unlocked) MemCachedStore object. // Private cached stores are closed after Persist. func NewPrivateMemCachedStore(lower Store) *MemCachedStore { - var cache map[int32]*NativeCacheItem - if cached, ok := lower.(*MemCachedStore); ok { - cache = cached.nativeCache - } else { - cache = make(map[int32]*NativeCacheItem) - } + // Do not copy cache from ps; instead should create clear map: GetRWCache and + // GetROCache will retrieve cache from the underlying nativeCache if requested. + // The lowest underlying store MUST have its native cache initialized, otherwise + // GetROCache and GetRWCache won't work properly. + cache := make(map[int32]NativeContractCache) return &MemCachedStore{ MemoryStore: *NewMemoryStore(), nativeCache: cache, @@ -362,15 +367,27 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { } s.mem = nil s.stor = nil + if cached, ok := s.ps.(*MemCachedStore); ok { + for id, nativeCache := range s.nativeCache { + updatedCache, err := nativeCache.Persist(cached.nativeCache[id]) + if err != nil { + return 0, fmt.Errorf("failed to persist native cache changes for private MemCachedStore: %w", err) + } + cached.nativeCache[id] = updatedCache + } + s.nativeCache = nil + } return keys, nil } s.plock.Lock() defer s.plock.Unlock() s.mut.Lock() + s.nativeCacheLock.Lock() keys = len(s.mem) + len(s.stor) if keys == 0 { + s.nativeCacheLock.Unlock() s.mut.Unlock() return 0, nil } @@ -379,18 +396,35 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { // starts using fresh new maps. This tempstore is only known here and // nothing ever changes it, therefore accesses to it (reads) can go // unprotected while writes are handled by s proper. - var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, stor: s.stor}, ps: s.ps} + var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, stor: s.stor}, ps: s.ps, nativeCache: s.nativeCache} s.ps = tempstore s.mem = make(map[string][]byte, len(s.mem)) s.stor = make(map[string][]byte, len(s.stor)) + cached, isPSCached := tempstore.ps.(*MemCachedStore) + if isPSCached { + s.nativeCache = make(map[int32]NativeContractCache) + } if !isSync { + s.nativeCacheLock.Unlock() s.mut.Unlock() } - + if isPSCached { + cached.nativeCacheLock.Lock() + for id, nativeCache := range tempstore.nativeCache { + updatedCache, err := nativeCache.Persist(cached.nativeCache[id]) + if err != nil { + cached.nativeCacheLock.Unlock() + return 0, fmt.Errorf("failed to persist native cache changes: %w", err) + } + cached.nativeCache[id] = updatedCache + } + cached.nativeCacheLock.Unlock() + } err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.stor) if !isSync { s.mut.Lock() + s.nativeCacheLock.Lock() } if err == nil { // tempstore.mem and tempstore.del are completely flushed now @@ -406,31 +440,71 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { for k := range s.stor { put(tempstore.stor, k, s.stor[k]) } + if isPSCached { + for id, nativeCache := range s.nativeCache { + updatedCache, err := nativeCache.Persist(tempstore.nativeCache[id]) + if err != nil { + return 0, fmt.Errorf("failed to persist native cache changes: %w", err) + } + tempstore.nativeCache[id] = updatedCache + } + s.nativeCache = tempstore.nativeCache + } s.ps = tempstore.ps s.mem = tempstore.mem s.stor = tempstore.stor } + s.nativeCacheLock.Unlock() s.mut.Unlock() return keys, err } -func (s *MemCachedStore) GetCache(k int32) interface{} { +// GetROCache returns native contact cache. The cache CAN NOT be modified by +// the caller. It's the caller's duty to keep it unmodified. +func (s *MemCachedStore) GetROCache(id int32) NativeContractCache { s.nativeCacheLock.RLock() defer s.nativeCacheLock.RUnlock() - if itm, ok := s.nativeCache[k]; ok { - return itm.Value - } - return nil + return s.getCache(id, true) } -func (s *MemCachedStore) SetCache(k int32, v interface{}) { +// GetRWCache returns native contact cache. The cache CAN BE safely modified +// by the caller. +func (s *MemCachedStore) GetRWCache(k int32) NativeContractCache { s.nativeCacheLock.Lock() defer s.nativeCacheLock.Unlock() - s.nativeCache[k] = &NativeCacheItem{ - Value: v, + return s.getCache(k, false) +} + +func (s *MemCachedStore) getCache(k int32, ro bool) NativeContractCache { + if itm, ok := s.nativeCache[k]; ok { + // Don't need to create itm copy, because its value was already copied + // the first time it was retrieved from loser ps. + return itm } + + if cached, ok := s.ps.(*MemCachedStore); ok { + if ro { + return cached.GetROCache(k) + } + v := cached.GetRWCache(k) + if v != nil { + // Create a copy here in order not to modify the existing cache. + cp := v.Copy() + s.nativeCache[k] = cp + return cp + } + } + + return nil +} + +func (s *MemCachedStore) SetCache(k int32, v NativeContractCache) { + s.nativeCacheLock.Lock() + defer s.nativeCacheLock.Unlock() + + s.nativeCache[k] = v } // Close implements Store interface, clears up memory and closes the lower layer