mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-23 13:41:37 +00:00
storage: allow to seek starting from some point
This commit is contained in:
parent
7d5b20d8dd
commit
6bc92abe19
17 changed files with 196 additions and 91 deletions
|
@ -486,7 +486,7 @@ func (bc *Blockchain) removeOldStorageItems() {
|
|||
|
||||
b := bc.dao.Store.Batch()
|
||||
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
||||
bc.dao.Store.Seek([]byte{byte(prefix)}, func(k, _ []byte) {
|
||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) {
|
||||
// #1468, but don't need to copy here, because it is done by Store.
|
||||
b.Delete(k)
|
||||
})
|
||||
|
|
|
@ -1769,7 +1769,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
|||
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
||||
tempPrefix = storage.STStorage
|
||||
}
|
||||
bcSpout.dao.Store.Seek(bcSpout.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) {
|
||||
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
||||
key := slice.Copy(k)
|
||||
key[0] = byte(tempPrefix)
|
||||
value := slice.Copy(v)
|
||||
|
|
|
@ -62,8 +62,8 @@ type DAO interface {
|
|||
PutStateSyncCurrentBlockHeight(h uint32) error
|
||||
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
||||
PutVersion(v Version) error
|
||||
Seek(id int32, prefix []byte, f func(k, v []byte))
|
||||
SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue
|
||||
Seek(id int32, rng storage.SeekRange, f func(k, v []byte))
|
||||
SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue
|
||||
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
||||
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
||||
StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error
|
||||
|
@ -300,30 +300,26 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
|
|||
Item: state.StorageItem(v),
|
||||
})
|
||||
}
|
||||
dao.Seek(id, prefix, saveToArr)
|
||||
dao.Seek(id, storage.SeekRange{Prefix: prefix}, saveToArr)
|
||||
return siArr, nil
|
||||
}
|
||||
|
||||
// Seek executes f for all items with a given prefix.
|
||||
// If key is to be used outside of f, they may not be copied.
|
||||
func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
|
||||
lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil)
|
||||
if prefix != nil {
|
||||
lookupKey = append(lookupKey, prefix...)
|
||||
}
|
||||
dao.Store.Seek(lookupKey, func(k, v []byte) {
|
||||
f(k[len(lookupKey):], v)
|
||||
// Seek executes f for all storage items matching a given `rng` (matching given prefix and
|
||||
// starting from the point specified). If key or value is to be used outside of f, they
|
||||
// may not be copied.
|
||||
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte)) {
|
||||
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
||||
dao.Store.Seek(rng, func(k, v []byte) {
|
||||
f(k[len(rng.Prefix):], v)
|
||||
})
|
||||
}
|
||||
|
||||
// SeekAsync sends all storage items matching given prefix to a channel and returns
|
||||
// the channel. Resulting keys and values may not be copied.
|
||||
func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue {
|
||||
lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil)
|
||||
if prefix != nil {
|
||||
lookupKey = append(lookupKey, prefix...)
|
||||
}
|
||||
return dao.Store.SeekAsync(ctx, lookupKey, true)
|
||||
// SeekAsync sends all storage items matching a given `rng` (matching given prefix and
|
||||
// starting from the point specified) to a channel and returns the channel.
|
||||
// Resulting keys and values may not be copied.
|
||||
func (dao *Simple) SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue {
|
||||
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
||||
return dao.Store.SeekAsync(ctx, rng, true)
|
||||
}
|
||||
|
||||
// makeStorageItemKey returns a key used to store StorageItem in the DB.
|
||||
|
@ -479,7 +475,9 @@ func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
|
|||
// the given underlying store.
|
||||
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
||||
hashMap := make(map[uint32][]util.Uint256)
|
||||
dao.Store.Seek(storage.IXHeaderHashList.Bytes(), func(k, v []byte) {
|
||||
dao.Store.Seek(storage.SeekRange{
|
||||
Prefix: storage.IXHeaderHashList.Bytes(),
|
||||
}, func(k, v []byte) {
|
||||
storedCount := binary.LittleEndian.Uint32(k[1:])
|
||||
hashes, err := read2000Uint256Hashes(v)
|
||||
if err != nil {
|
||||
|
|
|
@ -190,7 +190,7 @@ func storageFind(ic *interop.Context) error {
|
|||
// Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns
|
||||
// sorted items, so no need to sort them one more time.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
seekres := ic.DAO.SeekAsync(ctx, stc.ID, prefix)
|
||||
seekres := ic.DAO.SeekAsync(ctx, stc.ID, storage.SeekRange{Prefix: prefix})
|
||||
item := istorage.NewIterator(seekres, prefix, opts)
|
||||
ic.VM.Estack().PushItem(stackitem.NewInterop(item))
|
||||
ic.RegisterCancelFunc(cancel)
|
||||
|
|
|
@ -503,7 +503,7 @@ func (m *Management) InitializeCache(d dao.DAO) error {
|
|||
defer m.mtx.Unlock()
|
||||
|
||||
var initErr error
|
||||
d.Seek(m.ID, []byte{prefixContract}, func(_, v []byte) {
|
||||
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) {
|
||||
var cs = new(state.Contract)
|
||||
initErr = stackitem.DeserializeConvertible(v, cs)
|
||||
if initErr != nil {
|
||||
|
|
|
@ -386,7 +386,7 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
|
|||
func (n *NEO) getGASPerVote(d dao.DAO, key []byte, index ...uint32) []big.Int {
|
||||
var max = make([]uint32, len(index))
|
||||
var reward = make([]big.Int, len(index))
|
||||
d.Seek(n.ID, key, func(k, v []byte) {
|
||||
d.Seek(n.ID, storage.SeekRange{Prefix: key}, func(k, v []byte) {
|
||||
if len(k) == 4 {
|
||||
num := binary.BigEndian.Uint32(k)
|
||||
for i, ind := range index {
|
||||
|
@ -591,7 +591,7 @@ func (n *NEO) dropCandidateIfZero(d dao.DAO, pub *keys.PublicKey, c *candidate)
|
|||
|
||||
var toRemove []string
|
||||
voterKey := makeVoterKey(pub.Bytes())
|
||||
d.Seek(n.ID, voterKey, func(k, v []byte) {
|
||||
d.Seek(n.ID, storage.SeekRange{Prefix: voterKey}, func(k, v []byte) {
|
||||
toRemove = append(toRemove, string(k))
|
||||
})
|
||||
for i := range toRemove {
|
||||
|
|
|
@ -127,7 +127,7 @@ func (s *Module) CleanStorage() error {
|
|||
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
||||
}
|
||||
b := s.Store.Batch()
|
||||
s.Store.Seek([]byte{byte(storage.DataMPT)}, func(k, _ []byte) {
|
||||
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) {
|
||||
// #1468, but don't need to copy here, because it is done by Store.
|
||||
b.Delete(k)
|
||||
})
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
|||
nodes = make(map[util.Uint256][]byte)
|
||||
expectedItems []storage.KeyValue
|
||||
)
|
||||
expectedStorage.Seek(storage.DataMPT.Bytes(), func(k, v []byte) {
|
||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
||||
key := slice.Copy(k)
|
||||
value := slice.Copy(v)
|
||||
expectedItems = append(expectedItems, storage.KeyValue{
|
||||
|
@ -95,7 +95,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
|||
|
||||
// Compare resulting storage items and refcounts.
|
||||
var actualItems []storage.KeyValue
|
||||
expectedStorage.Seek(storage.DataMPT.Bytes(), func(k, v []byte) {
|
||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
||||
key := slice.Copy(k)
|
||||
value := slice.Copy(v)
|
||||
actualItems = append(actualItems, storage.KeyValue{
|
||||
|
|
|
@ -424,7 +424,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
|||
// compare storage states
|
||||
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
||||
var kv []storage.KeyValue
|
||||
bc.dao.Store.Seek(bc.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) {
|
||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
||||
key := slice.Copy(k)
|
||||
value := slice.Copy(v)
|
||||
if key[0] == byte(storage.STTempStorage) {
|
||||
|
@ -444,7 +444,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
|||
// no temp items should be left
|
||||
require.Eventually(t, func() bool {
|
||||
var haveItems bool
|
||||
bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(_, _ []byte) {
|
||||
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) {
|
||||
haveItems = true
|
||||
})
|
||||
return !haveItems
|
||||
|
|
|
@ -109,11 +109,15 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
|||
}
|
||||
|
||||
// Seek implements the Store interface.
|
||||
func (s *BoltDBStore) Seek(key []byte, f func(k, v []byte)) {
|
||||
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||
copy(start, rng.Prefix)
|
||||
copy(start[len(rng.Prefix):], rng.Start)
|
||||
prefix := util.BytesPrefix(rng.Prefix)
|
||||
prefix.Start = start
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
c := tx.Bucket(Bucket).Cursor()
|
||||
prefix := util.BytesPrefix(key)
|
||||
for k, v := c.Seek(prefix.Start); k != nil && bytes.Compare(k, prefix.Limit) <= 0; k, v = c.Next() {
|
||||
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
|
||||
f(k, v)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -85,8 +85,13 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool
|
|||
}
|
||||
|
||||
// Seek implements the Store interface.
|
||||
func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) {
|
||||
iter := s.db.NewIterator(util.BytesPrefix(key), nil)
|
||||
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||
copy(start, rng.Prefix)
|
||||
copy(start[len(rng.Prefix):], rng.Start)
|
||||
prefix := util.BytesPrefix(rng.Prefix)
|
||||
prefix.Start = start
|
||||
iter := s.db.NewIterator(prefix, nil)
|
||||
for iter.Next() {
|
||||
f(iter.Key(), iter.Value())
|
||||
}
|
||||
|
|
|
@ -90,17 +90,17 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
|||
}
|
||||
|
||||
// Seek implements the Store interface.
|
||||
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
|
||||
s.seek(context.Background(), key, false, f)
|
||||
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||
s.seek(context.Background(), rng, false, f)
|
||||
}
|
||||
|
||||
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
|
||||
// value slices may not be copied and may be modified. SeekAsync can guarantee
|
||||
// that key-value items are sorted by key in ascending way.
|
||||
func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue {
|
||||
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
||||
res := make(chan KeyValue)
|
||||
go func() {
|
||||
s.seek(ctx, key, cutPrefix, func(k, v []byte) {
|
||||
s.seek(ctx, rng, cutPrefix, func(k, v []byte) {
|
||||
res <- KeyValue{
|
||||
Key: k,
|
||||
Value: v,
|
||||
|
@ -112,13 +112,23 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo
|
|||
return res
|
||||
}
|
||||
|
||||
func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f func(k, v []byte)) {
|
||||
// seek is internal representations of Seek* capable of seeking for the given key
|
||||
// and supporting early stop using provided context. `cutPrefix` denotes whether provided
|
||||
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
||||
// and point to start seeking from.
|
||||
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) {
|
||||
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
||||
var memRes []KeyValueExists
|
||||
sk := string(key)
|
||||
sPrefix := string(rng.Prefix)
|
||||
lPrefix := len(sPrefix)
|
||||
sStart := string(rng.Start)
|
||||
lStart := len(sStart)
|
||||
isKeyOK := func(key string) bool {
|
||||
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
||||
}
|
||||
s.mut.RLock()
|
||||
for k, v := range s.MemoryStore.mem {
|
||||
if strings.HasPrefix(k, sk) {
|
||||
if isKeyOK(k) {
|
||||
memRes = append(memRes, KeyValueExists{
|
||||
KeyValue: KeyValue{
|
||||
Key: []byte(k),
|
||||
|
@ -129,7 +139,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
|||
}
|
||||
}
|
||||
for k := range s.MemoryStore.del {
|
||||
if strings.HasPrefix(k, sk) {
|
||||
if isKeyOK(k) {
|
||||
memRes = append(memRes, KeyValueExists{
|
||||
KeyValue: KeyValue{
|
||||
Key: []byte(k),
|
||||
|
@ -156,7 +166,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
|||
iMem++
|
||||
}
|
||||
// Merge results of seek operations in ascending order.
|
||||
ps.Seek(key, func(k, v []byte) {
|
||||
mergeFunc := func(k, v []byte) {
|
||||
if done {
|
||||
return
|
||||
}
|
||||
|
@ -175,7 +185,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
|||
if isMem {
|
||||
if kvMem.Exists {
|
||||
if cutPrefix {
|
||||
kvMem.Key = kvMem.Key[len(key):]
|
||||
kvMem.Key = kvMem.Key[lPrefix:]
|
||||
}
|
||||
f(kvMem.Key, kvMem.Value)
|
||||
}
|
||||
|
@ -189,7 +199,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
|||
} else {
|
||||
if !bytes.Equal(kvMem.Key, kvPs.Key) {
|
||||
if cutPrefix {
|
||||
kvPs.Key = kvPs.Key[len(key):]
|
||||
kvPs.Key = kvPs.Key[lPrefix:]
|
||||
}
|
||||
f(kvPs.Key, kvPs.Value)
|
||||
}
|
||||
|
@ -197,7 +207,9 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
|||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
ps.Seek(rng, mergeFunc)
|
||||
|
||||
if !done && haveMem {
|
||||
loop:
|
||||
for i := iMem - 1; i < len(memRes); i++ {
|
||||
|
@ -208,7 +220,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
|||
kvMem = memRes[i]
|
||||
if kvMem.Exists {
|
||||
if cutPrefix {
|
||||
kvMem.Key = kvMem.Key[len(key):]
|
||||
kvMem.Key = kvMem.Key[lPrefix:]
|
||||
}
|
||||
f(kvMem.Key, kvMem.Value)
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ func TestCachedSeek(t *testing.T) {
|
|||
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||
}
|
||||
foundKVs := make(map[string][]byte)
|
||||
ts.Seek(goodPrefix, func(k, v []byte) {
|
||||
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
||||
foundKVs[string(k)] = v
|
||||
})
|
||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||
|
@ -232,7 +232,7 @@ func benchmarkCachedSeek(t *testing.B, ps Store, psElementsCount, tsElementsCoun
|
|||
t.ReportAllocs()
|
||||
t.ResetTimer()
|
||||
for n := 0; n < t.N; n++ {
|
||||
ts.Seek(searchPrefix, func(k, v []byte) {})
|
||||
ts.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
||||
}
|
||||
t.StopTimer()
|
||||
}
|
||||
|
@ -290,7 +290,7 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
|
|||
b.onPutBatch()
|
||||
return ErrKeyNotFound
|
||||
}
|
||||
func (b *BadStore) Seek(k []byte, f func(k, v []byte)) {
|
||||
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||
}
|
||||
func (b *BadStore) Close() error {
|
||||
return nil
|
||||
|
@ -365,7 +365,7 @@ func TestCachedSeekSorting(t *testing.T) {
|
|||
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||
}
|
||||
var foundKVs []KeyValue
|
||||
ts.Seek(goodPrefix, func(k, v []byte) {
|
||||
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
||||
foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)})
|
||||
})
|
||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||
|
|
|
@ -104,9 +104,9 @@ func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
|||
}
|
||||
|
||||
// Seek implements the Store interface.
|
||||
func (s *MemoryStore) Seek(key []byte, f func(k, v []byte)) {
|
||||
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||
s.mut.RLock()
|
||||
s.seek(key, f)
|
||||
s.seek(rng, f)
|
||||
s.mut.RUnlock()
|
||||
}
|
||||
|
||||
|
@ -127,12 +127,20 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
|
|||
}
|
||||
}
|
||||
|
||||
// seek is an internal unlocked implementation of Seek.
|
||||
func (s *MemoryStore) seek(key []byte, f func(k, v []byte)) {
|
||||
sk := string(key)
|
||||
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
||||
// seeking starting from the provided prefix should be performed.
|
||||
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
|
||||
sPrefix := string(rng.Prefix)
|
||||
lPrefix := len(sPrefix)
|
||||
sStart := string(rng.Start)
|
||||
lStart := len(sStart)
|
||||
var memList []KeyValue
|
||||
|
||||
isKeyOK := func(key string) bool {
|
||||
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
||||
}
|
||||
for k, v := range s.mem {
|
||||
if strings.HasPrefix(k, sk) {
|
||||
if isKeyOK(k) {
|
||||
memList = append(memList, KeyValue{
|
||||
Key: []byte(k),
|
||||
Value: v,
|
||||
|
|
|
@ -28,7 +28,7 @@ func BenchmarkMemorySeek(t *testing.B) {
|
|||
t.ReportAllocs()
|
||||
t.ResetTimer()
|
||||
for n := 0; n < t.N; n++ {
|
||||
ms.Seek(searchPrefix, func(k, v []byte) {})
|
||||
ms.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -45,6 +45,22 @@ const (
|
|||
MaxStorageValueLen = 65535
|
||||
)
|
||||
|
||||
// SeekRange represents options for Store.Seek operation.
|
||||
type SeekRange struct {
|
||||
// Prefix denotes the Seek's lookup key.
|
||||
// Empty Prefix means seeking through all keys in the DB starting from
|
||||
// the Start if specified.
|
||||
Prefix []byte
|
||||
// Start denotes value upended to the Prefix to start Seek from.
|
||||
// Seeking starting from some key includes this key to the result;
|
||||
// if no matching key was found then next suitable key is picked up.
|
||||
// Start may be empty. Empty Start means seeking through all keys in
|
||||
// the DB with matching Prefix.
|
||||
// Empty Prefix and empty Start can be combined, which means seeking
|
||||
// through all keys in the DB.
|
||||
Start []byte
|
||||
}
|
||||
|
||||
// ErrKeyNotFound is an error returned by Store implementations
|
||||
// when a certain key is not found.
|
||||
var ErrKeyNotFound = errors.New("key not found")
|
||||
|
@ -63,7 +79,7 @@ type (
|
|||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||
// Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by
|
||||
// key in ascending way.
|
||||
Seek(k []byte, f func(k, v []byte))
|
||||
Seek(rng SeekRange, f func(k, v []byte))
|
||||
Close() error
|
||||
}
|
||||
|
||||
|
|
|
@ -67,42 +67,104 @@ func testStorePutBatch(t *testing.T, s Store) {
|
|||
}
|
||||
|
||||
func testStoreSeek(t *testing.T, s Store) {
|
||||
var (
|
||||
// Given this prefix...
|
||||
goodprefix = []byte{'f'}
|
||||
// these pairs should be found...
|
||||
goodkvs = []KeyValue{
|
||||
{[]byte("foo"), []byte("bar")},
|
||||
{[]byte("faa"), []byte("bra")},
|
||||
{[]byte("foox"), []byte("barx")},
|
||||
}
|
||||
// and these should be not.
|
||||
badkvs = []KeyValue{
|
||||
{[]byte("doo"), []byte("pow")},
|
||||
{[]byte("mew"), []byte("qaz")},
|
||||
}
|
||||
)
|
||||
|
||||
for _, v := range goodkvs {
|
||||
require.NoError(t, s.Put(v.Key, v.Value))
|
||||
// Use the same set of kvs to test Seek with different prefix/start values.
|
||||
kvs := []KeyValue{
|
||||
{[]byte("10"), []byte("bar")},
|
||||
{[]byte("11"), []byte("bara")},
|
||||
{[]byte("20"), []byte("barb")},
|
||||
{[]byte("21"), []byte("barc")},
|
||||
{[]byte("22"), []byte("bard")},
|
||||
{[]byte("30"), []byte("bare")},
|
||||
{[]byte("31"), []byte("barf")},
|
||||
}
|
||||
for _, v := range badkvs {
|
||||
for _, v := range kvs {
|
||||
require.NoError(t, s.Put(v.Key, v.Value))
|
||||
}
|
||||
|
||||
// Seek result expected to be sorted in an ascending way.
|
||||
sort.Slice(goodkvs, func(i, j int) bool {
|
||||
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
||||
})
|
||||
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue) {
|
||||
// Seek result expected to be sorted in an ascending way.
|
||||
sort.Slice(goodkvs, func(i, j int) bool {
|
||||
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
||||
})
|
||||
|
||||
actual := make([]KeyValue, 0, len(goodkvs))
|
||||
s.Seek(goodprefix, func(k, v []byte) {
|
||||
actual = append(actual, KeyValue{
|
||||
Key: slice.Copy(k),
|
||||
Value: slice.Copy(v),
|
||||
actual := make([]KeyValue, 0, len(goodkvs))
|
||||
s.Seek(SeekRange{
|
||||
Prefix: goodprefix,
|
||||
Start: start,
|
||||
}, func(k, v []byte) {
|
||||
actual = append(actual, KeyValue{
|
||||
Key: slice.Copy(k),
|
||||
Value: slice.Copy(v),
|
||||
})
|
||||
})
|
||||
assert.Equal(t, goodkvs, actual)
|
||||
}
|
||||
|
||||
t.Run("non-empty prefix, empty start", func(t *testing.T) {
|
||||
t.Run("good", func(t *testing.T) {
|
||||
// Given this prefix...
|
||||
goodprefix := []byte("2")
|
||||
// and empty start range...
|
||||
start := []byte{}
|
||||
// these pairs should be found.
|
||||
goodkvs := []KeyValue{
|
||||
kvs[2], // key = "20"
|
||||
kvs[3], // key = "21"
|
||||
kvs[4], // key = "22"
|
||||
}
|
||||
check(t, goodprefix, start, goodkvs)
|
||||
})
|
||||
t.Run("no matching items", func(t *testing.T) {
|
||||
goodprefix := []byte("0")
|
||||
start := []byte{}
|
||||
check(t, goodprefix, start, []KeyValue{})
|
||||
})
|
||||
})
|
||||
assert.Equal(t, goodkvs, actual)
|
||||
|
||||
t.Run("non-empty prefix, non-empty start", func(t *testing.T) {
|
||||
t.Run("good", func(t *testing.T) {
|
||||
goodprefix := []byte("2")
|
||||
start := []byte("1") // start will be upended to goodprefix to start seek from
|
||||
goodkvs := []KeyValue{
|
||||
kvs[3], // key = "21"
|
||||
kvs[4], // key = "22"
|
||||
}
|
||||
check(t, goodprefix, start, goodkvs)
|
||||
})
|
||||
t.Run("no matching items", func(t *testing.T) {
|
||||
goodprefix := []byte("2")
|
||||
start := []byte("3") // start will be upended to goodprefix to start seek from
|
||||
check(t, goodprefix, start, []KeyValue{})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("empty prefix, non-empty start", func(t *testing.T) {
|
||||
t.Run("good", func(t *testing.T) {
|
||||
goodprefix := []byte{}
|
||||
start := []byte("21")
|
||||
goodkvs := []KeyValue{
|
||||
kvs[3], // key = "21"
|
||||
kvs[4], // key = "22"
|
||||
kvs[5], // key = "30"
|
||||
kvs[6], // key = "31"
|
||||
}
|
||||
check(t, goodprefix, start, goodkvs)
|
||||
})
|
||||
t.Run("no matching items", func(t *testing.T) {
|
||||
goodprefix := []byte{}
|
||||
start := []byte("32")
|
||||
check(t, goodprefix, start, []KeyValue{})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("empty prefix, empty start", func(t *testing.T) {
|
||||
goodprefix := []byte{}
|
||||
start := []byte{}
|
||||
goodkvs := make([]KeyValue, len(kvs))
|
||||
copy(goodkvs, kvs)
|
||||
check(t, goodprefix, start, goodkvs)
|
||||
})
|
||||
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue