core: refactor storage.Find and storage.Iterator to work with channel

Add SeekAsync methods in order to fetch matching storage items
on demand. Refactor storage.Find and storage.Iterator wrt these changes.
This commit is contained in:
Anna Shaleva 2021-10-04 17:01:42 +03:00
parent f2ac07a3c0
commit 89ee2e7720
4 changed files with 55 additions and 45 deletions

View file

@ -64,6 +64,7 @@ type DAO interface {
PutStorageItem(id int32, key []byte, si state.StorageItem) error PutStorageItem(id int32, key []byte, si state.StorageItem) error
PutVersion(v string) error PutVersion(v string) error
Seek(id int32, prefix []byte, f func(k, v []byte)) Seek(id int32, prefix []byte, f func(k, v []byte))
SeekAsync(id int32, prefix []byte) chan storage.KeyValue
StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error
@ -336,13 +337,19 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
// Seek executes f for all items with a given prefix. // Seek executes f for all items with a given prefix.
// If key is to be used outside of f, they may not be copied. // If key is to be used outside of f, they may not be copied.
func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
for r := range dao.SeekAsync(id, prefix) {
f(r.Key, r.Value)
}
}
// SeekAsync sends all storage items matching given prefix to a channel and returns
// the channel. Resulting keys and values may not be copied.
func (dao *Simple) SeekAsync(id int32, prefix []byte) chan storage.KeyValue {
lookupKey := makeStorageItemKey(id, nil) lookupKey := makeStorageItemKey(id, nil)
if prefix != nil { if prefix != nil {
lookupKey = append(lookupKey, prefix...) lookupKey = append(lookupKey, prefix...)
} }
dao.Store.Seek(lookupKey, func(k, v []byte) { return dao.Store.SeekAsync(lookupKey, true)
f(k[len(lookupKey):], v)
})
} }
// makeStorageItemKey returns a key used to store StorageItem in the DB. // makeStorageItemKey returns a key used to store StorageItem in the DB.

View file

@ -1,6 +1,10 @@
package storage package storage
import "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" import (
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
)
// Storage iterator options. // Storage iterator options.
const ( const (
@ -18,44 +22,45 @@ const (
// Iterator is an iterator state representation. // Iterator is an iterator state representation.
type Iterator struct { type Iterator struct {
m []stackitem.MapElement seekCh chan storage.KeyValue
curr storage.KeyValue
next bool
opts int64 opts int64
index int prefix []byte
prefixSize int
} }
// NewIterator creates a new Iterator with given options for a given map. // NewIterator creates a new Iterator with given options for a given channel of store.Seek results.
func NewIterator(m *stackitem.Map, prefix int, opts int64) *Iterator { func NewIterator(seekCh chan storage.KeyValue, prefix []byte, opts int64) *Iterator {
return &Iterator{ return &Iterator{
m: m.Value().([]stackitem.MapElement), seekCh: seekCh,
opts: opts, opts: opts,
index: -1, prefix: slice.Copy(prefix),
prefixSize: prefix,
} }
} }
// Next advances the iterator and returns true if Value can be called at the // Next advances the iterator and returns true if Value can be called at the
// current position. // current position.
func (s *Iterator) Next() bool { func (s *Iterator) Next() bool {
if s.index < len(s.m) { s.curr, s.next = <-s.seekCh
s.index++ return s.next
}
return s.index < len(s.m)
} }
// Value returns current iterators value (exact type depends on options this // Value returns current iterators value (exact type depends on options this
// iterator was created with). // iterator was created with).
func (s *Iterator) Value() stackitem.Item { func (s *Iterator) Value() stackitem.Item {
key := s.m[s.index].Key.Value().([]byte) if !s.next {
if s.opts&FindRemovePrefix != 0 { panic("iterator index out of range")
key = key[s.prefixSize:] }
key := s.curr.Key
if s.opts&FindRemovePrefix == 0 {
key = append(s.prefix, key...)
} }
if s.opts&FindKeysOnly != 0 { if s.opts&FindKeysOnly != 0 {
return stackitem.NewByteArray(key) return stackitem.NewByteArray(key)
} }
value := s.m[s.index].Value value := stackitem.Item(stackitem.NewByteArray(s.curr.Value))
if s.opts&FindDeserialize != 0 { if s.opts&FindDeserialize != 0 {
bs := s.m[s.index].Value.Value().([]byte) bs := s.curr.Value
var err error var err error
value, err = stackitem.Deserialize(bs) value, err = stackitem.Deserialize(bs)
if err != nil { if err != nil {

View file

@ -186,26 +186,10 @@ func storageFind(ic *interop.Context) error {
if opts&istorage.FindDeserialize == 0 && (opts&istorage.FindPick0 != 0 || opts&istorage.FindPick1 != 0) { if opts&istorage.FindDeserialize == 0 && (opts&istorage.FindPick0 != 0 || opts&istorage.FindPick1 != 0) {
return fmt.Errorf("%w: PickN is specified without Deserialize", errFindInvalidOptions) return fmt.Errorf("%w: PickN is specified without Deserialize", errFindInvalidOptions)
} }
siArr, err := ic.DAO.GetStorageItemsWithPrefix(stc.ID, prefix) // Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns
if err != nil {
return err
}
arr := make([]stackitem.MapElement, 0, len(siArr))
for _, kv := range siArr {
keycopy := make([]byte, len(kv.Key)+len(prefix))
copy(keycopy, prefix)
copy(keycopy[len(prefix):], kv.Key)
arr = append(arr, stackitem.MapElement{
Key: stackitem.NewByteArray(keycopy),
Value: stackitem.NewByteArray(kv.Item),
})
}
// Items in arr should be sorted by key, but GetStorageItemsWithPrefix returns
// sorted items, so no need to sort them one more time. // sorted items, so no need to sort them one more time.
seekres := ic.DAO.SeekAsync(stc.ID, prefix)
filteredMap := stackitem.NewMapWithValue(arr) item := istorage.NewIterator(seekres, prefix, opts)
item := istorage.NewIterator(filteredMap, len(prefix), opts)
ic.VM.Estack().PushItem(stackitem.NewInterop(item)) ic.VM.Estack().PushItem(stackitem.NewInterop(item))
return nil return nil

View file

@ -90,6 +90,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
// Seek implements the Store interface. // Seek implements the Store interface.
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
seekres := s.SeekAsync(key, false)
for kv := range seekres {
f(kv.Key, kv.Value)
}
}
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
// value slices may not be copied and may be modified. SeekAsync can guarantee
// that key-value items are sorted by key in ascending way.
func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue {
// Create memory store `mem` and `del` snapshot not to hold the lock. // Create memory store `mem` and `del` snapshot not to hold the lock.
memRes := make([]KeyValueExists, 0) memRes := make([]KeyValueExists, 0)
sk := string(key) sk := string(key)
@ -158,6 +168,9 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
var isMem = haveMem && (!havePs || (bytes.Compare(kvMem.Key, kvPs.Key) < 0)) var isMem = haveMem && (!havePs || (bytes.Compare(kvMem.Key, kvPs.Key) < 0))
if isMem { if isMem {
if kvMem.Exists { if kvMem.Exists {
if cutPrefix {
kvMem.Key = kvMem.Key[len(key):]
}
seekres <- KeyValue{ seekres <- KeyValue{
Key: kvMem.Key, Key: kvMem.Key,
Value: kvMem.Value, Value: kvMem.Value,
@ -166,6 +179,9 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
kvMem, haveMem = <-data1 kvMem, haveMem = <-data1
} else { } else {
if !bytes.Equal(kvMem.Key, kvPs.Key) { if !bytes.Equal(kvMem.Key, kvPs.Key) {
if cutPrefix {
kvPs.Key = kvPs.Key[len(key):]
}
seekres <- kvPs seekres <- kvPs
} }
kvPs, havePs = <-data2 kvPs, havePs = <-data2
@ -174,9 +190,7 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
close(seekres) close(seekres)
}() }()
for r := range seekres { return seekres
f(r.Key, r.Value)
}
} }
// Persist flushes all the MemoryStore contents into the (supposedly) persistent // Persist flushes all the MemoryStore contents into the (supposedly) persistent