storage: add "private" mode to MemCachedStore

Most of the time we don't need locking on the higher-level stores and we drop
them after Persist, so that's what private MemCachedStore is for.

It doesn't improve things in any noticeable way, some ~1% can be observed in
neo-bench under various loads and even less than that in chain processing. But
it seems to be a bit better anyway (less allocations, less locks).
This commit is contained in:
Roman Khimov 2022-02-16 19:13:06 +03:00
parent aefb26255a
commit 9bfb3357f2
7 changed files with 163 additions and 58 deletions

View file

@ -519,7 +519,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
fallthrough
case newStorageItemsAdded:
cache := bc.dao.GetWrapped()
cache := bc.dao.GetPrivate()
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool {
// #1468, but don't need to copy here, because it is done by Store.
@ -903,7 +903,7 @@ func (bc *Blockchain) AddHeaders(headers ...*block.Header) error {
func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
var (
start = time.Now()
batch = bc.dao.GetWrapped()
batch = bc.dao.GetPrivate()
err error
)
@ -997,8 +997,8 @@ func (bc *Blockchain) GetStateSyncModule() *statesync.Module {
// This is the only way to change Blockchain state.
func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error {
var (
cache = bc.dao.GetWrapped()
aerCache = bc.dao.GetWrapped()
cache = bc.dao.GetPrivate()
aerCache = bc.dao.GetPrivate()
appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions))
aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune.
aerdone = make(chan error)
@ -2153,7 +2153,7 @@ func (bc *Blockchain) GetEnrollments() ([]state.Validator, error) {
// GetTestVM returns an interop context with VM set up for a test run.
func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context {
d := bc.dao.GetWrapped()
d := bc.dao.GetPrivate()
systemInterop := bc.newInteropContext(t, d, b, tx)
vm := systemInterop.SpawnVM()
vm.SetPriceGetter(systemInterop.GetPrice)

View file

@ -36,6 +36,10 @@ type Simple struct {
// NewSimple creates new simple dao using provided backend store.
func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple {
st := storage.NewMemCachedStore(backend)
return newSimple(st, stateRootInHeader, p2pSigExtensions)
}
func newSimple(st *storage.MemCachedStore, stateRootInHeader bool, p2pSigExtensions bool) *Simple {
return &Simple{
Version: Version{
StoragePrefix: storage.STStorage,
@ -59,6 +63,15 @@ func (dao *Simple) GetWrapped() *Simple {
return d
}
// GetPrivate returns new DAO instance with another layer of private
// MemCachedStore around the current DAO Store.
func (dao *Simple) GetPrivate() *Simple {
st := storage.NewPrivateMemCachedStore(dao.Store)
d := newSimple(st, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions)
d.Version = dao.Version
return d
}
// GetAndDecode performs get operation and decoding with serializable structures.
func (dao *Simple) GetAndDecode(entity io.Serializable, key []byte) error {
entityBytes, err := dao.Store.Get(key)
@ -730,7 +743,7 @@ func (dao *Simple) PersistSync() (int, error) {
// GetMPTBatch storage changes to be applied to MPT.
func (dao *Simple) GetMPTBatch() mpt.Batch {
var b mpt.Batch
dao.Store.MemoryStore.SeekAll([]byte{byte(dao.Version.StoragePrefix)}, func(k, v []byte) {
dao.Store.SeekAll([]byte{byte(dao.Version.StoragePrefix)}, func(k, v []byte) {
b.Add(k[1:], v)
})
return b

View file

@ -71,7 +71,7 @@ func NewContext(trigger trigger.Type, bc Ledger, d *dao.Simple,
getContract func(*dao.Simple, util.Uint160) (*state.Contract, error), natives []Contract,
block *block.Block, tx *transaction.Transaction, log *zap.Logger) *Context {
baseExecFee := int64(DefaultBaseExecFee)
dao := d.GetWrapped()
dao := d.GetPrivate()
if bc != nil && (block == nil || block.Index != 0) {
baseExecFee = bc.GetBaseExecFee()

View file

@ -329,7 +329,7 @@ func (s *Module) AddBlock(block *block.Block) error {
return errors.New("invalid block: MerkleRoot mismatch")
}
}
cache := s.dao.GetWrapped()
cache := s.dao.GetPrivate()
writeBuf := io.NewBufBinWriter()
if err := cache.StoreAsBlock(block, nil, nil, writeBuf); err != nil {
return err

View file

@ -15,6 +15,7 @@ import (
type MemCachedStore struct {
MemoryStore
private bool
// plock protects Persist from double entrance.
plock sync.Mutex
// Persistent Store.
@ -51,10 +52,48 @@ func NewMemCachedStore(lower Store) *MemCachedStore {
}
}
// NewPrivateMemCachedStore creates a new private (unlocked) MemCachedStore object.
// Private cached stores are closed after Persist.
func NewPrivateMemCachedStore(lower Store) *MemCachedStore {
return &MemCachedStore{
MemoryStore: *NewMemoryStore(),
private: true,
ps: lower,
}
}
// lock write-locks non-private store.
func (s *MemCachedStore) lock() {
if !s.private {
s.mut.Lock()
}
}
// unlock unlocks non-private store.
func (s *MemCachedStore) unlock() {
if !s.private {
s.mut.Unlock()
}
}
// rlock read-locks non-private store.
func (s *MemCachedStore) rlock() {
if !s.private {
s.mut.RLock()
}
}
// runlock drops read lock for non-private stores.
func (s *MemCachedStore) runlock() {
if !s.private {
s.mut.RUnlock()
}
}
// Get implements the Store interface.
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
s.mut.RLock()
defer s.mut.RUnlock()
s.rlock()
defer s.runlock()
m := s.chooseMap(key)
if val, ok := m[string(key)]; ok {
if val == nil {
@ -69,24 +108,23 @@ func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
func (s *MemCachedStore) Put(key, value []byte) {
newKey := string(key)
vcopy := slice.Copy(value)
s.mut.Lock()
s.lock()
put(s.chooseMap(key), newKey, vcopy)
s.mut.Unlock()
s.unlock()
}
// Delete drops KV pair from the store. Never returns an error.
func (s *MemCachedStore) Delete(key []byte) {
newKey := string(key)
s.mut.Lock()
s.lock()
put(s.chooseMap(key), newKey, nil)
s.mut.Unlock()
s.unlock()
}
// GetBatch returns currently accumulated changeset.
func (s *MemCachedStore) GetBatch() *MemBatch {
s.mut.RLock()
defer s.mut.RUnlock()
s.rlock()
defer s.runlock()
var b MemBatch
b.Put = make([]KeyValueExists, 0, len(s.mem)+len(s.stor))
@ -105,11 +143,34 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
return &b
}
// PutChangeSet implements the Store interface. Never returns an error.
func (s *MemCachedStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error {
s.lock()
s.MemoryStore.putChangeSet(puts, stores)
s.unlock()
return nil
}
// Seek implements the Store interface.
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.seek(context.Background(), rng, false, f)
}
// SeekAll is like seek but also iterates over deleted items.
func (s *MemCachedStore) SeekAll(key []byte, f func(k, v []byte)) {
if !s.private {
s.mut.RLock()
defer s.mut.RUnlock()
}
sk := string(key)
m := s.chooseMap(key)
for k, v := range m {
if strings.HasPrefix(k, sk) {
f([]byte(k), v)
}
}
}
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
// value slices may not be copied and may be modified. SeekAsync can guarantee
// that key-value items are sorted by key in ascending way.
@ -150,7 +211,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
}
}
s.mut.RLock()
s.rlock()
m := s.MemoryStore.chooseMap(rng.Prefix)
for k, v := range m {
if isKeyOK(k) {
@ -164,8 +225,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
}
}
ps := s.ps
s.mut.RUnlock()
s.runlock()
less := func(k1, k2 []byte) bool {
res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0)
@ -276,6 +336,20 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
var err error
var keys int
if s.private {
keys = len(s.mem) + len(s.stor)
if keys == 0 {
return 0, nil
}
err = s.ps.PutChangeSet(s.mem, s.stor)
if err != nil {
return 0, err
}
s.mem = nil
s.stor = nil
return keys, nil
}
s.plock.Lock()
defer s.plock.Unlock()
s.mut.Lock()

View file

@ -355,6 +355,31 @@ func TestMemCachedPersistFailing(t *testing.T) {
require.Equal(t, b1, res)
}
func TestPrivateMemCachedPersistFailing(t *testing.T) {
var (
bs BadStore
t1 = []byte("t1")
t2 = []byte("t2")
)
// cached Store
ts := NewPrivateMemCachedStore(&bs)
// Set a pair of keys.
ts.Put(t1, t1)
ts.Put(t2, t2)
// This will be called during Persist().
bs.onPutBatch = func() {}
_, err := ts.Persist()
require.Error(t, err)
// PutBatch() failed in Persist, but we still should have proper state.
res, err := ts.Get(t1)
require.NoError(t, err)
require.Equal(t, t1, res)
res, err = ts.Get(t2)
require.NoError(t, err)
require.Equal(t, t2, res)
}
func TestCachedSeekSorting(t *testing.T) {
var (
// Given this prefix...
@ -378,9 +403,10 @@ func TestCachedSeekSorting(t *testing.T) {
{[]byte{1, 3, 2}, []byte("wop")},
{[]byte{1, 3, 4}, []byte("zaq")},
}
ps = NewMemoryStore()
ts = NewMemCachedStore(ps)
)
for _, newCached := range []func(Store) *MemCachedStore{NewMemCachedStore, NewPrivateMemCachedStore} {
ps := NewMemoryStore()
ts := newCached(ps)
for _, v := range lowerKVs {
require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
}
@ -404,3 +430,4 @@ func TestCachedSeekSorting(t *testing.T) {
})
require.Equal(t, expected, foundKVs)
}
}

View file

@ -52,14 +52,18 @@ func put(m map[string][]byte, key string, value []byte) {
// PutChangeSet implements the Store interface. Never returns an error.
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error {
s.mut.Lock()
s.putChangeSet(puts, stores)
s.mut.Unlock()
return nil
}
func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]byte) {
for k := range puts {
put(s.mem, k, puts[k])
}
for k := range stores {
put(s.stor, k, stores[k])
}
s.mut.Unlock()
return nil
}
// Seek implements the Store interface.
@ -84,19 +88,6 @@ func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
return nil
}
// SeekAll is like seek but also iterates over deleted items.
func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
s.mut.RLock()
defer s.mut.RUnlock()
sk := string(key)
m := s.chooseMap(key)
for k, v := range m {
if strings.HasPrefix(k, sk) {
f([]byte(k), v)
}
}
}
// seek is an internal unlocked implementation of Seek. `start` denotes whether
// seeking starting from the provided prefix should be performed. Backwards
// seeking from some point is supported with corresponding SeekRange field set.