From 89ee2e77204d9129987d192775e78bcbd891a2a7 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 4 Oct 2021 17:01:42 +0300 Subject: [PATCH] core: refactor storage.Find and storage.Iterator to work with channel Add SeekAsync methods in order to fetch matching storage items on demand. Refactor storage.Find and storage.Iterator wrt these changes. --- pkg/core/dao/dao.go | 13 +++++++-- pkg/core/interop/storage/find.go | 45 ++++++++++++++++------------- pkg/core/interop_system.go | 22 ++------------ pkg/core/storage/memcached_store.go | 20 +++++++++++-- 4 files changed, 55 insertions(+), 45 deletions(-) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index bf4587ee0..4ed9ce4ea 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -64,6 +64,7 @@ type DAO interface { PutStorageItem(id int32, key []byte, si state.StorageItem) error PutVersion(v string) error Seek(id int32, prefix []byte, f func(k, v []byte)) + SeekAsync(id int32, prefix []byte) chan storage.KeyValue StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error @@ -336,13 +337,19 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S // Seek executes f for all items with a given prefix. // If key is to be used outside of f, they may not be copied. func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { + for r := range dao.SeekAsync(id, prefix) { + f(r.Key, r.Value) + } +} + +// SeekAsync sends all storage items matching given prefix to a channel and returns +// the channel. Resulting keys and values may not be copied. +func (dao *Simple) SeekAsync(id int32, prefix []byte) chan storage.KeyValue { lookupKey := makeStorageItemKey(id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } - dao.Store.Seek(lookupKey, func(k, v []byte) { - f(k[len(lookupKey):], v) - }) + return dao.Store.SeekAsync(lookupKey, true) } // makeStorageItemKey returns a key used to store StorageItem in the DB. diff --git a/pkg/core/interop/storage/find.go b/pkg/core/interop/storage/find.go index 045c9290f..35b6621f4 100644 --- a/pkg/core/interop/storage/find.go +++ b/pkg/core/interop/storage/find.go @@ -1,6 +1,10 @@ package storage -import "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" +import ( + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/util/slice" + "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" +) // Storage iterator options. const ( @@ -18,44 +22,45 @@ const ( // Iterator is an iterator state representation. type Iterator struct { - m []stackitem.MapElement - opts int64 - index int - prefixSize int + seekCh chan storage.KeyValue + curr storage.KeyValue + next bool + opts int64 + prefix []byte } -// NewIterator creates a new Iterator with given options for a given map. -func NewIterator(m *stackitem.Map, prefix int, opts int64) *Iterator { +// NewIterator creates a new Iterator with given options for a given channel of store.Seek results. +func NewIterator(seekCh chan storage.KeyValue, prefix []byte, opts int64) *Iterator { return &Iterator{ - m: m.Value().([]stackitem.MapElement), - opts: opts, - index: -1, - prefixSize: prefix, + seekCh: seekCh, + opts: opts, + prefix: slice.Copy(prefix), } } // Next advances the iterator and returns true if Value can be called at the // current position. func (s *Iterator) Next() bool { - if s.index < len(s.m) { - s.index++ - } - return s.index < len(s.m) + s.curr, s.next = <-s.seekCh + return s.next } // Value returns current iterators value (exact type depends on options this // iterator was created with). func (s *Iterator) Value() stackitem.Item { - key := s.m[s.index].Key.Value().([]byte) - if s.opts&FindRemovePrefix != 0 { - key = key[s.prefixSize:] + if !s.next { + panic("iterator index out of range") + } + key := s.curr.Key + if s.opts&FindRemovePrefix == 0 { + key = append(s.prefix, key...) } if s.opts&FindKeysOnly != 0 { return stackitem.NewByteArray(key) } - value := s.m[s.index].Value + value := stackitem.Item(stackitem.NewByteArray(s.curr.Value)) if s.opts&FindDeserialize != 0 { - bs := s.m[s.index].Value.Value().([]byte) + bs := s.curr.Value var err error value, err = stackitem.Deserialize(bs) if err != nil { diff --git a/pkg/core/interop_system.go b/pkg/core/interop_system.go index f3d9f1a87..1b6f8b53b 100644 --- a/pkg/core/interop_system.go +++ b/pkg/core/interop_system.go @@ -186,26 +186,10 @@ func storageFind(ic *interop.Context) error { if opts&istorage.FindDeserialize == 0 && (opts&istorage.FindPick0 != 0 || opts&istorage.FindPick1 != 0) { return fmt.Errorf("%w: PickN is specified without Deserialize", errFindInvalidOptions) } - siArr, err := ic.DAO.GetStorageItemsWithPrefix(stc.ID, prefix) - if err != nil { - return err - } - - arr := make([]stackitem.MapElement, 0, len(siArr)) - for _, kv := range siArr { - keycopy := make([]byte, len(kv.Key)+len(prefix)) - copy(keycopy, prefix) - copy(keycopy[len(prefix):], kv.Key) - arr = append(arr, stackitem.MapElement{ - Key: stackitem.NewByteArray(keycopy), - Value: stackitem.NewByteArray(kv.Item), - }) - } - // Items in arr should be sorted by key, but GetStorageItemsWithPrefix returns + // Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns // sorted items, so no need to sort them one more time. - - filteredMap := stackitem.NewMapWithValue(arr) - item := istorage.NewIterator(filteredMap, len(prefix), opts) + seekres := ic.DAO.SeekAsync(stc.ID, prefix) + item := istorage.NewIterator(seekres, prefix, opts) ic.VM.Estack().PushItem(stackitem.NewInterop(item)) return nil diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 98e7a6631..7b7af2bce 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -90,6 +90,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch { // Seek implements the Store interface. func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { + seekres := s.SeekAsync(key, false) + for kv := range seekres { + f(kv.Key, kv.Value) + } +} + +// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and +// value slices may not be copied and may be modified. SeekAsync can guarantee +// that key-value items are sorted by key in ascending way. +func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue { // Create memory store `mem` and `del` snapshot not to hold the lock. memRes := make([]KeyValueExists, 0) sk := string(key) @@ -158,6 +168,9 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { var isMem = haveMem && (!havePs || (bytes.Compare(kvMem.Key, kvPs.Key) < 0)) if isMem { if kvMem.Exists { + if cutPrefix { + kvMem.Key = kvMem.Key[len(key):] + } seekres <- KeyValue{ Key: kvMem.Key, Value: kvMem.Value, @@ -166,6 +179,9 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { kvMem, haveMem = <-data1 } else { if !bytes.Equal(kvMem.Key, kvPs.Key) { + if cutPrefix { + kvPs.Key = kvPs.Key[len(key):] + } seekres <- kvPs } kvPs, havePs = <-data2 @@ -174,9 +190,7 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { close(seekres) }() - for r := range seekres { - f(r.Key, r.Value) - } + return seekres } // Persist flushes all the MemoryStore contents into the (supposedly) persistent