diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 4ed9ce4ea..bc3b90cbf 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -2,6 +2,7 @@ package dao import ( "bytes" + "context" "encoding/binary" "errors" "fmt" @@ -64,7 +65,7 @@ type DAO interface { PutStorageItem(id int32, key []byte, si state.StorageItem) error PutVersion(v string) error Seek(id int32, prefix []byte, f func(k, v []byte)) - SeekAsync(id int32, prefix []byte) chan storage.KeyValue + SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error @@ -337,19 +338,20 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S // Seek executes f for all items with a given prefix. // If key is to be used outside of f, they may not be copied. func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { - for r := range dao.SeekAsync(id, prefix) { + res := dao.SeekAsync(context.Background(), id, prefix) + for r := range res { f(r.Key, r.Value) } } // SeekAsync sends all storage items matching given prefix to a channel and returns // the channel. Resulting keys and values may not be copied. -func (dao *Simple) SeekAsync(id int32, prefix []byte) chan storage.KeyValue { +func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue { lookupKey := makeStorageItemKey(id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } - return dao.Store.SeekAsync(lookupKey, true) + return dao.Store.SeekAsync(ctx, lookupKey, true) } // makeStorageItemKey returns a key used to store StorageItem in the DB. diff --git a/pkg/core/interop/storage/find.go b/pkg/core/interop/storage/find.go index 35b6621f4..e09e569b3 100644 --- a/pkg/core/interop/storage/find.go +++ b/pkg/core/interop/storage/find.go @@ -1,6 +1,8 @@ package storage import ( + "context" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" @@ -23,6 +25,7 @@ const ( // Iterator is an iterator state representation. type Iterator struct { seekCh chan storage.KeyValue + cancel context.CancelFunc curr storage.KeyValue next bool opts int64 @@ -30,9 +33,10 @@ type Iterator struct { } // NewIterator creates a new Iterator with given options for a given channel of store.Seek results. -func NewIterator(seekCh chan storage.KeyValue, prefix []byte, opts int64) *Iterator { +func NewIterator(seekCh chan storage.KeyValue, cancel context.CancelFunc, prefix []byte, opts int64) *Iterator { return &Iterator{ seekCh: seekCh, + cancel: cancel, opts: opts, prefix: slice.Copy(prefix), } @@ -80,3 +84,9 @@ func (s *Iterator) Value() stackitem.Item { value, }) } + +// Close releases resources occupied by the Iterator. +// TODO: call this method on program unloading. +func (s *Iterator) Close() { + s.cancel() +} diff --git a/pkg/core/interop_system.go b/pkg/core/interop_system.go index 1b6f8b53b..a21044057 100644 --- a/pkg/core/interop_system.go +++ b/pkg/core/interop_system.go @@ -1,6 +1,7 @@ package core import ( + "context" "crypto/elliptic" "errors" "fmt" @@ -188,8 +189,9 @@ func storageFind(ic *interop.Context) error { } // Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns // sorted items, so no need to sort them one more time. - seekres := ic.DAO.SeekAsync(stc.ID, prefix) - item := istorage.NewIterator(seekres, prefix, opts) + ctx, cancel := context.WithCancel(context.Background()) + seekres := ic.DAO.SeekAsync(ctx, stc.ID, prefix) + item := istorage.NewIterator(seekres, cancel, prefix, opts) ic.VM.Estack().PushItem(stackitem.NewInterop(item)) return nil diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 7b7af2bce..6619876be 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "context" "sort" "strings" "sync" @@ -90,7 +91,7 @@ func (s *MemCachedStore) GetBatch() *MemBatch { // Seek implements the Store interface. func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { - seekres := s.SeekAsync(key, false) + seekres := s.SeekAsync(context.Background(), key, false) for kv := range seekres { f(kv.Key, kv.Value) } @@ -99,7 +100,7 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { // SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and // value slices may not be copied and may be modified. SeekAsync can guarantee // that key-value items are sorted by key in ascending way. -func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue { +func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue { // Create memory store `mem` and `del` snapshot not to hold the lock. memRes := make([]KeyValueExists, 0) sk := string(key) @@ -137,8 +138,14 @@ func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue { ) // Seek over memory store. go func() { + loop: for _, kv := range memRes { - data1 <- kv + select { + case <-ctx.Done(): + break loop + default: + data1 <- kv + } } close(data1) }() @@ -146,11 +153,20 @@ func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue { // Seek over persistent store. go func() { s.mut.RLock() + var done bool s.ps.Seek(key, func(k, v []byte) { - // Must copy here, #1468. - data2 <- KeyValue{ - Key: slice.Copy(k), - Value: slice.Copy(v), + if done { + return + } + select { + case <-ctx.Done(): + done = true + default: + // Must copy here, #1468. + data2 <- KeyValue{ + Key: slice.Copy(k), + Value: slice.Copy(v), + } } }) s.mut.RUnlock()