core: add ability to free storage.Iterator resources

This commit is contained in:
Anna Shaleva 2021-10-06 15:54:44 +03:00
parent 89ee2e7720
commit 0a4f45c9b0
4 changed files with 44 additions and 14 deletions

View file

@ -2,6 +2,7 @@ package dao
import ( import (
"bytes" "bytes"
"context"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
@ -64,7 +65,7 @@ type DAO interface {
PutStorageItem(id int32, key []byte, si state.StorageItem) error PutStorageItem(id int32, key []byte, si state.StorageItem) error
PutVersion(v string) error PutVersion(v string) error
Seek(id int32, prefix []byte, f func(k, v []byte)) Seek(id int32, prefix []byte, f func(k, v []byte))
SeekAsync(id int32, prefix []byte) chan storage.KeyValue SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue
StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error
@ -337,19 +338,20 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
// Seek executes f for all items with a given prefix. // Seek executes f for all items with a given prefix.
// If key is to be used outside of f, they may not be copied. // If key is to be used outside of f, they may not be copied.
func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
for r := range dao.SeekAsync(id, prefix) { res := dao.SeekAsync(context.Background(), id, prefix)
for r := range res {
f(r.Key, r.Value) f(r.Key, r.Value)
} }
} }
// SeekAsync sends all storage items matching given prefix to a channel and returns // SeekAsync sends all storage items matching given prefix to a channel and returns
// the channel. Resulting keys and values may not be copied. // the channel. Resulting keys and values may not be copied.
func (dao *Simple) SeekAsync(id int32, prefix []byte) chan storage.KeyValue { func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue {
lookupKey := makeStorageItemKey(id, nil) lookupKey := makeStorageItemKey(id, nil)
if prefix != nil { if prefix != nil {
lookupKey = append(lookupKey, prefix...) lookupKey = append(lookupKey, prefix...)
} }
return dao.Store.SeekAsync(lookupKey, true) return dao.Store.SeekAsync(ctx, lookupKey, true)
} }
// makeStorageItemKey returns a key used to store StorageItem in the DB. // makeStorageItemKey returns a key used to store StorageItem in the DB.

View file

@ -1,6 +1,8 @@
package storage package storage
import ( import (
"context"
"github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
@ -23,6 +25,7 @@ const (
// Iterator is an iterator state representation. // Iterator is an iterator state representation.
type Iterator struct { type Iterator struct {
seekCh chan storage.KeyValue seekCh chan storage.KeyValue
cancel context.CancelFunc
curr storage.KeyValue curr storage.KeyValue
next bool next bool
opts int64 opts int64
@ -30,9 +33,10 @@ type Iterator struct {
} }
// NewIterator creates a new Iterator with given options for a given channel of store.Seek results. // NewIterator creates a new Iterator with given options for a given channel of store.Seek results.
func NewIterator(seekCh chan storage.KeyValue, prefix []byte, opts int64) *Iterator { func NewIterator(seekCh chan storage.KeyValue, cancel context.CancelFunc, prefix []byte, opts int64) *Iterator {
return &Iterator{ return &Iterator{
seekCh: seekCh, seekCh: seekCh,
cancel: cancel,
opts: opts, opts: opts,
prefix: slice.Copy(prefix), prefix: slice.Copy(prefix),
} }
@ -80,3 +84,9 @@ func (s *Iterator) Value() stackitem.Item {
value, value,
}) })
} }
// Close releases resources occupied by the Iterator.
// TODO: call this method on program unloading.
func (s *Iterator) Close() {
s.cancel()
}

View file

@ -1,6 +1,7 @@
package core package core
import ( import (
"context"
"crypto/elliptic" "crypto/elliptic"
"errors" "errors"
"fmt" "fmt"
@ -188,8 +189,9 @@ func storageFind(ic *interop.Context) error {
} }
// Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns // Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns
// sorted items, so no need to sort them one more time. // sorted items, so no need to sort them one more time.
seekres := ic.DAO.SeekAsync(stc.ID, prefix) ctx, cancel := context.WithCancel(context.Background())
item := istorage.NewIterator(seekres, prefix, opts) seekres := ic.DAO.SeekAsync(ctx, stc.ID, prefix)
item := istorage.NewIterator(seekres, cancel, prefix, opts)
ic.VM.Estack().PushItem(stackitem.NewInterop(item)) ic.VM.Estack().PushItem(stackitem.NewInterop(item))
return nil return nil

View file

@ -2,6 +2,7 @@ package storage
import ( import (
"bytes" "bytes"
"context"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -90,7 +91,7 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
// Seek implements the Store interface. // Seek implements the Store interface.
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
seekres := s.SeekAsync(key, false) seekres := s.SeekAsync(context.Background(), key, false)
for kv := range seekres { for kv := range seekres {
f(kv.Key, kv.Value) f(kv.Key, kv.Value)
} }
@ -99,7 +100,7 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and // SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
// value slices may not be copied and may be modified. SeekAsync can guarantee // value slices may not be copied and may be modified. SeekAsync can guarantee
// that key-value items are sorted by key in ascending way. // that key-value items are sorted by key in ascending way.
func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue { func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue {
// Create memory store `mem` and `del` snapshot not to hold the lock. // Create memory store `mem` and `del` snapshot not to hold the lock.
memRes := make([]KeyValueExists, 0) memRes := make([]KeyValueExists, 0)
sk := string(key) sk := string(key)
@ -137,21 +138,36 @@ func (s *MemCachedStore) SeekAsync(key []byte, cutPrefix bool) chan KeyValue {
) )
// Seek over memory store. // Seek over memory store.
go func() { go func() {
loop:
for _, kv := range memRes { for _, kv := range memRes {
select {
case <-ctx.Done():
break loop
default:
data1 <- kv data1 <- kv
} }
}
close(data1) close(data1)
}() }()
// Seek over persistent store. // Seek over persistent store.
go func() { go func() {
s.mut.RLock() s.mut.RLock()
var done bool
s.ps.Seek(key, func(k, v []byte) { s.ps.Seek(key, func(k, v []byte) {
if done {
return
}
select {
case <-ctx.Done():
done = true
default:
// Must copy here, #1468. // Must copy here, #1468.
data2 <- KeyValue{ data2 <- KeyValue{
Key: slice.Copy(k), Key: slice.Copy(k),
Value: slice.Copy(v), Value: slice.Copy(v),
} }
}
}) })
s.mut.RUnlock() s.mut.RUnlock()
close(data2) close(data2)