From 7ba88e98e215c2ceff5a46f72bfff4c69ac06444 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 22 Sep 2021 18:58:48 +0300 Subject: [PATCH] core: optimize (*MemCachedStore).Seek operation Real persistent storage guarantees that result of Seek is sorted by keys. The idea of optimisation is to merge two sorted seek results into one (memStore+persistentStore), so that (*MemCachedStore).Seek will return sorted list. The only thing that remains is to sort items got from (*MemoryStore).Seek. --- pkg/core/storage/memcached_store.go | 103 ++++++++++++++++++++--- pkg/core/storage/memcached_store_test.go | 52 ++++++++++++ pkg/core/storage/store.go | 3 +- 3 files changed, 145 insertions(+), 13 deletions(-) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index f2ebac2ba..98e7a6631 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -1,6 +1,13 @@ package storage -import "sync" +import ( + "bytes" + "sort" + "strings" + "sync" + + "github.com/nspcc-dev/neo-go/pkg/util/slice" +) // MemCachedStore is a wrapper around persistent store that caches all changes // being made for them to be later flushed in one batch. @@ -83,21 +90,93 @@ func (s *MemCachedStore) GetBatch() *MemBatch { // Seek implements the Store interface. func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { + // Create memory store `mem` and `del` snapshot not to hold the lock. + memRes := make([]KeyValueExists, 0) + sk := string(key) s.mut.RLock() - defer s.mut.RUnlock() - s.MemoryStore.seek(key, f) - s.ps.Seek(key, func(k, v []byte) { - elem := string(k) - // If it's in mem, we already called f() for it in MemoryStore.Seek(). - _, present := s.mem[elem] - if !present { - // If it's in del, we shouldn't be calling f() anyway. - _, present = s.del[elem] + for k, v := range s.MemoryStore.mem { + if strings.HasPrefix(k, sk) { + memRes = append(memRes, KeyValueExists{ + KeyValue: KeyValue{ + Key: []byte(k), + Value: slice.Copy(v), + }, + Exists: true, + }) } - if !present { - f(k, v) + } + for k := range s.MemoryStore.del { + if strings.HasPrefix(k, sk) { + memRes = append(memRes, KeyValueExists{ + KeyValue: KeyValue{ + Key: []byte(k), + }, + }) } + } + s.mut.RUnlock() + // Sort memRes items for further comparison with ps items. + sort.Slice(memRes, func(i, j int) bool { + return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0 }) + + var ( + data1 = make(chan KeyValueExists) + data2 = make(chan KeyValue) + seekres = make(chan KeyValue) + ) + // Seek over memory store. + go func() { + for _, kv := range memRes { + data1 <- kv + } + close(data1) + }() + + // Seek over persistent store. + go func() { + s.mut.RLock() + s.ps.Seek(key, func(k, v []byte) { + // Must copy here, #1468. + data2 <- KeyValue{ + Key: slice.Copy(k), + Value: slice.Copy(v), + } + }) + s.mut.RUnlock() + close(data2) + }() + + // Merge results of seek operations in ascending order. + go func() { + kvMem, haveMem := <-data1 + kvPs, havePs := <-data2 + for { + if !haveMem && !havePs { + break + } + var isMem = haveMem && (!havePs || (bytes.Compare(kvMem.Key, kvPs.Key) < 0)) + if isMem { + if kvMem.Exists { + seekres <- KeyValue{ + Key: kvMem.Key, + Value: kvMem.Value, + } + } + kvMem, haveMem = <-data1 + } else { + if !bytes.Equal(kvMem.Key, kvPs.Key) { + seekres <- kvPs + } + kvPs, havePs = <-data2 + } + } + close(seekres) + }() + + for r := range seekres { + f(r.Key, r.Value) + } } // Persist flushes all the MemoryStore contents into the (supposedly) persistent diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 5373bd3c1..216c6b8e7 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -1,10 +1,13 @@ package storage import ( + "bytes" "fmt" + "sort" "testing" "github.com/nspcc-dev/neo-go/internal/random" + "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -319,3 +322,52 @@ func TestMemCachedPersistFailing(t *testing.T) { require.NoError(t, err) require.Equal(t, b1, res) } + +func TestCachedSeekSorting(t *testing.T) { + var ( + // Given this prefix... + goodPrefix = []byte{1} + // these pairs should be found... + lowerKVs = []kvSeen{ + {[]byte{1, 2, 3}, []byte("bra"), false}, + {[]byte{1, 2, 5}, []byte("bar"), false}, + {[]byte{1, 3, 3}, []byte("bra"), false}, + {[]byte{1, 3, 5}, []byte("bra"), false}, + } + // and these should be not. + deletedKVs = []kvSeen{ + {[]byte{1, 7, 3}, []byte("pow"), false}, + {[]byte{1, 7, 4}, []byte("qaz"), false}, + } + // and these should be not. + updatedKVs = []kvSeen{ + {[]byte{1, 2, 4}, []byte("zaq"), false}, + {[]byte{1, 2, 6}, []byte("zaq"), false}, + {[]byte{1, 3, 2}, []byte("wop"), false}, + {[]byte{1, 3, 4}, []byte("zaq"), false}, + } + ps = NewMemoryStore() + ts = NewMemCachedStore(ps) + ) + for _, v := range lowerKVs { + require.NoError(t, ps.Put(v.key, v.val)) + } + for _, v := range deletedKVs { + require.NoError(t, ps.Put(v.key, v.val)) + require.NoError(t, ts.Delete(v.key)) + } + for _, v := range updatedKVs { + require.NoError(t, ps.Put(v.key, []byte("stub"))) + require.NoError(t, ts.Put(v.key, v.val)) + } + var foundKVs []kvSeen + ts.Seek(goodPrefix, func(k, v []byte) { + foundKVs = append(foundKVs, kvSeen{key: slice.Copy(k), val: slice.Copy(v)}) + }) + assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs)) + expected := append(lowerKVs, updatedKVs...) + sort.Slice(expected, func(i, j int) bool { + return bytes.Compare(expected[i].key, expected[j].key) < 0 + }) + require.Equal(t, expected, foundKVs) +} diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index bd62f6001..f87f94e87 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -55,7 +55,8 @@ type ( // PutChangeSet allows to push prepared changeset to the Store. PutChangeSet(puts map[string][]byte, dels map[string]bool) error // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. - // Key and value slices should not be modified. + // Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by + // key in ascending way. Seek(k []byte, f func(k, v []byte)) Close() error }