From 9bfb3357f2c07e8b2bbd4e95b1d5e75dca5f3c36 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 16 Feb 2022 19:13:06 +0300 Subject: [PATCH] storage: add "private" mode to MemCachedStore Most of the time we don't need locking on the higher-level stores and we drop them after Persist, so that's what private MemCachedStore is for. It doesn't improve things in any noticeable way, some ~1% can be observed in neo-bench under various loads and even less than that in chain processing. But it seems to be a bit better anyway (less allocations, less locks). --- pkg/core/blockchain.go | 10 +-- pkg/core/dao/dao.go | 15 +++- pkg/core/interop/context.go | 2 +- pkg/core/statesync/module.go | 2 +- pkg/core/storage/memcached_store.go | 98 +++++++++++++++++++++--- pkg/core/storage/memcached_store_test.go | 73 ++++++++++++------ pkg/core/storage/memory_store.go | 21 ++--- 7 files changed, 163 insertions(+), 58 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index e396cb530..12e51cfee 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -519,7 +519,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error fallthrough case newStorageItemsAdded: - cache := bc.dao.GetWrapped() + cache := bc.dao.GetPrivate() prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool { // #1468, but don't need to copy here, because it is done by Store. @@ -903,7 +903,7 @@ func (bc *Blockchain) AddHeaders(headers ...*block.Header) error { func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { var ( start = time.Now() - batch = bc.dao.GetWrapped() + batch = bc.dao.GetPrivate() err error ) @@ -997,8 +997,8 @@ func (bc *Blockchain) GetStateSyncModule() *statesync.Module { // This is the only way to change Blockchain state. func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error { var ( - cache = bc.dao.GetWrapped() - aerCache = bc.dao.GetWrapped() + cache = bc.dao.GetPrivate() + aerCache = bc.dao.GetPrivate() appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions)) aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune. aerdone = make(chan error) @@ -2153,7 +2153,7 @@ func (bc *Blockchain) GetEnrollments() ([]state.Validator, error) { // GetTestVM returns an interop context with VM set up for a test run. func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context { - d := bc.dao.GetWrapped() + d := bc.dao.GetPrivate() systemInterop := bc.newInteropContext(t, d, b, tx) vm := systemInterop.SpawnVM() vm.SetPriceGetter(systemInterop.GetPrice) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 6318de884..f402e44c1 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -36,6 +36,10 @@ type Simple struct { // NewSimple creates new simple dao using provided backend store. func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple { st := storage.NewMemCachedStore(backend) + return newSimple(st, stateRootInHeader, p2pSigExtensions) +} + +func newSimple(st *storage.MemCachedStore, stateRootInHeader bool, p2pSigExtensions bool) *Simple { return &Simple{ Version: Version{ StoragePrefix: storage.STStorage, @@ -59,6 +63,15 @@ func (dao *Simple) GetWrapped() *Simple { return d } +// GetPrivate returns new DAO instance with another layer of private +// MemCachedStore around the current DAO Store. +func (dao *Simple) GetPrivate() *Simple { + st := storage.NewPrivateMemCachedStore(dao.Store) + d := newSimple(st, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions) + d.Version = dao.Version + return d +} + // GetAndDecode performs get operation and decoding with serializable structures. func (dao *Simple) GetAndDecode(entity io.Serializable, key []byte) error { entityBytes, err := dao.Store.Get(key) @@ -730,7 +743,7 @@ func (dao *Simple) PersistSync() (int, error) { // GetMPTBatch storage changes to be applied to MPT. func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch - dao.Store.MemoryStore.SeekAll([]byte{byte(dao.Version.StoragePrefix)}, func(k, v []byte) { + dao.Store.SeekAll([]byte{byte(dao.Version.StoragePrefix)}, func(k, v []byte) { b.Add(k[1:], v) }) return b diff --git a/pkg/core/interop/context.go b/pkg/core/interop/context.go index c18d31fbc..e5418a65f 100644 --- a/pkg/core/interop/context.go +++ b/pkg/core/interop/context.go @@ -71,7 +71,7 @@ func NewContext(trigger trigger.Type, bc Ledger, d *dao.Simple, getContract func(*dao.Simple, util.Uint160) (*state.Contract, error), natives []Contract, block *block.Block, tx *transaction.Transaction, log *zap.Logger) *Context { baseExecFee := int64(DefaultBaseExecFee) - dao := d.GetWrapped() + dao := d.GetPrivate() if bc != nil && (block == nil || block.Index != 0) { baseExecFee = bc.GetBaseExecFee() diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 912dcaaf1..ca616167c 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -329,7 +329,7 @@ func (s *Module) AddBlock(block *block.Block) error { return errors.New("invalid block: MerkleRoot mismatch") } } - cache := s.dao.GetWrapped() + cache := s.dao.GetPrivate() writeBuf := io.NewBufBinWriter() if err := cache.StoreAsBlock(block, nil, nil, writeBuf); err != nil { return err diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 962bb68d8..fcee9754e 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -15,6 +15,7 @@ import ( type MemCachedStore struct { MemoryStore + private bool // plock protects Persist from double entrance. plock sync.Mutex // Persistent Store. @@ -51,10 +52,48 @@ func NewMemCachedStore(lower Store) *MemCachedStore { } } +// NewPrivateMemCachedStore creates a new private (unlocked) MemCachedStore object. +// Private cached stores are closed after Persist. +func NewPrivateMemCachedStore(lower Store) *MemCachedStore { + return &MemCachedStore{ + MemoryStore: *NewMemoryStore(), + private: true, + ps: lower, + } +} + +// lock write-locks non-private store. +func (s *MemCachedStore) lock() { + if !s.private { + s.mut.Lock() + } +} + +// unlock unlocks non-private store. +func (s *MemCachedStore) unlock() { + if !s.private { + s.mut.Unlock() + } +} + +// rlock read-locks non-private store. +func (s *MemCachedStore) rlock() { + if !s.private { + s.mut.RLock() + } +} + +// runlock drops read lock for non-private stores. +func (s *MemCachedStore) runlock() { + if !s.private { + s.mut.RUnlock() + } +} + // Get implements the Store interface. func (s *MemCachedStore) Get(key []byte) ([]byte, error) { - s.mut.RLock() - defer s.mut.RUnlock() + s.rlock() + defer s.runlock() m := s.chooseMap(key) if val, ok := m[string(key)]; ok { if val == nil { @@ -69,24 +108,23 @@ func (s *MemCachedStore) Get(key []byte) ([]byte, error) { func (s *MemCachedStore) Put(key, value []byte) { newKey := string(key) vcopy := slice.Copy(value) - s.mut.Lock() + s.lock() put(s.chooseMap(key), newKey, vcopy) - s.mut.Unlock() + s.unlock() } // Delete drops KV pair from the store. Never returns an error. func (s *MemCachedStore) Delete(key []byte) { newKey := string(key) - s.mut.Lock() + s.lock() put(s.chooseMap(key), newKey, nil) - s.mut.Unlock() + s.unlock() } // GetBatch returns currently accumulated changeset. func (s *MemCachedStore) GetBatch() *MemBatch { - s.mut.RLock() - defer s.mut.RUnlock() - + s.rlock() + defer s.runlock() var b MemBatch b.Put = make([]KeyValueExists, 0, len(s.mem)+len(s.stor)) @@ -105,11 +143,34 @@ func (s *MemCachedStore) GetBatch() *MemBatch { return &b } +// PutChangeSet implements the Store interface. Never returns an error. +func (s *MemCachedStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { + s.lock() + s.MemoryStore.putChangeSet(puts, stores) + s.unlock() + return nil +} + // Seek implements the Store interface. func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) { s.seek(context.Background(), rng, false, f) } +// SeekAll is like seek but also iterates over deleted items. +func (s *MemCachedStore) SeekAll(key []byte, f func(k, v []byte)) { + if !s.private { + s.mut.RLock() + defer s.mut.RUnlock() + } + sk := string(key) + m := s.chooseMap(key) + for k, v := range m { + if strings.HasPrefix(k, sk) { + f([]byte(k), v) + } + } +} + // SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and // value slices may not be copied and may be modified. SeekAsync can guarantee // that key-value items are sorted by key in ascending way. @@ -150,7 +211,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0) } } - s.mut.RLock() + s.rlock() m := s.MemoryStore.chooseMap(rng.Prefix) for k, v := range m { if isKeyOK(k) { @@ -164,8 +225,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool } } ps := s.ps - s.mut.RUnlock() - + s.runlock() less := func(k1, k2 []byte) bool { res := bytes.Compare(k1, k2) return res != 0 && rng.Backwards == (res > 0) @@ -276,6 +336,20 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { var err error var keys int + if s.private { + keys = len(s.mem) + len(s.stor) + if keys == 0 { + return 0, nil + } + err = s.ps.PutChangeSet(s.mem, s.stor) + if err != nil { + return 0, err + } + s.mem = nil + s.stor = nil + return keys, nil + } + s.plock.Lock() defer s.plock.Unlock() s.mut.Lock() diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 16a391e7c..0472c9817 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -355,6 +355,31 @@ func TestMemCachedPersistFailing(t *testing.T) { require.Equal(t, b1, res) } +func TestPrivateMemCachedPersistFailing(t *testing.T) { + var ( + bs BadStore + t1 = []byte("t1") + t2 = []byte("t2") + ) + // cached Store + ts := NewPrivateMemCachedStore(&bs) + // Set a pair of keys. + ts.Put(t1, t1) + ts.Put(t2, t2) + // This will be called during Persist(). + bs.onPutBatch = func() {} + + _, err := ts.Persist() + require.Error(t, err) + // PutBatch() failed in Persist, but we still should have proper state. + res, err := ts.Get(t1) + require.NoError(t, err) + require.Equal(t, t1, res) + res, err = ts.Get(t2) + require.NoError(t, err) + require.Equal(t, t2, res) +} + func TestCachedSeekSorting(t *testing.T) { var ( // Given this prefix... @@ -378,29 +403,31 @@ func TestCachedSeekSorting(t *testing.T) { {[]byte{1, 3, 2}, []byte("wop")}, {[]byte{1, 3, 4}, []byte("zaq")}, } - ps = NewMemoryStore() - ts = NewMemCachedStore(ps) ) - for _, v := range lowerKVs { - require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil)) + for _, newCached := range []func(Store) *MemCachedStore{NewMemCachedStore, NewPrivateMemCachedStore} { + ps := NewMemoryStore() + ts := newCached(ps) + for _, v := range lowerKVs { + require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil)) + } + for _, v := range deletedKVs { + require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil)) + ts.Delete(v.Key) + } + for _, v := range updatedKVs { + require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil)) + ts.Put(v.Key, v.Value) + } + var foundKVs []KeyValue + ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) bool { + foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)}) + return true + }) + assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs)) + expected := append(lowerKVs, updatedKVs...) + sort.Slice(expected, func(i, j int) bool { + return bytes.Compare(expected[i].Key, expected[j].Key) < 0 + }) + require.Equal(t, expected, foundKVs) } - for _, v := range deletedKVs { - require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil)) - ts.Delete(v.Key) - } - for _, v := range updatedKVs { - require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil)) - ts.Put(v.Key, v.Value) - } - var foundKVs []KeyValue - ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) bool { - foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)}) - return true - }) - assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs)) - expected := append(lowerKVs, updatedKVs...) - sort.Slice(expected, func(i, j int) bool { - return bytes.Compare(expected[i].Key, expected[j].Key) < 0 - }) - require.Equal(t, expected, foundKVs) } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 62551928d..5462b937a 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -52,14 +52,18 @@ func put(m map[string][]byte, key string, value []byte) { // PutChangeSet implements the Store interface. Never returns an error. func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { s.mut.Lock() + s.putChangeSet(puts, stores) + s.mut.Unlock() + return nil +} + +func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]byte) { for k := range puts { put(s.mem, k, puts[k]) } for k := range stores { put(s.stor, k, stores[k]) } - s.mut.Unlock() - return nil } // Seek implements the Store interface. @@ -84,19 +88,6 @@ func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { return nil } -// SeekAll is like seek but also iterates over deleted items. -func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { - s.mut.RLock() - defer s.mut.RUnlock() - sk := string(key) - m := s.chooseMap(key) - for k, v := range m { - if strings.HasPrefix(k, sk) { - f([]byte(k), v) - } - } -} - // seek is an internal unlocked implementation of Seek. `start` denotes whether // seeking starting from the provided prefix should be performed. Backwards // seeking from some point is supported with corresponding SeekRange field set.