core: split (*MemCachedStore) Seek and SeekAsync methods

Use SeekAsync for System.Storage.Find and Seek for the rest of cases.
This commit is contained in:
Anna Shaleva 2021-10-19 18:03:47 +03:00
parent dcda7bec63
commit 3450371910
2 changed files with 84 additions and 85 deletions

View file

@ -338,10 +338,13 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
// Seek executes f for all items with a given prefix. // Seek executes f for all items with a given prefix.
// If key is to be used outside of f, they may not be copied. // If key is to be used outside of f, they may not be copied.
func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
res := dao.SeekAsync(context.Background(), id, prefix) lookupKey := makeStorageItemKey(id, nil)
for r := range res { if prefix != nil {
f(r.Key, r.Value) lookupKey = append(lookupKey, prefix...)
} }
dao.Store.Seek(lookupKey, func(k, v []byte) {
f(k[len(lookupKey):], v)
})
} }
// SeekAsync sends all storage items matching given prefix to a channel and returns // SeekAsync sends all storage items matching given prefix to a channel and returns

View file

@ -91,16 +91,28 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
// Seek implements the Store interface. // Seek implements the Store interface.
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
seekres := s.SeekAsync(context.Background(), key, false) s.seek(context.Background(), key, false, f)
for kv := range seekres {
f(kv.Key, kv.Value)
}
} }
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and // SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
// value slices may not be copied and may be modified. SeekAsync can guarantee // value slices may not be copied and may be modified. SeekAsync can guarantee
// that key-value items are sorted by key in ascending way. // that key-value items are sorted by key in ascending way.
func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue { func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue {
res := make(chan KeyValue)
go func() {
s.seek(ctx, key, cutPrefix, func(k, v []byte) {
res <- KeyValue{
Key: k,
Value: v,
}
})
close(res)
}()
return res
}
func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f func(k, v []byte)) {
// Create memory store `mem` and `del` snapshot not to hold the lock. // Create memory store `mem` and `del` snapshot not to hold the lock.
var memRes []KeyValueExists var memRes []KeyValueExists
sk := string(key) sk := string(key)
@ -132,93 +144,77 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo
return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0 return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0
}) })
var seekres = make(chan KeyValue) var (
done bool
// Seek over persistent store. iMem int
go func() { kvMem KeyValueExists
var ( haveMem bool
done bool )
iMem int if iMem < len(memRes) {
kvMem KeyValueExists kvMem = memRes[iMem]
haveMem bool haveMem = true
) iMem++
if iMem < len(memRes) { }
kvMem = memRes[iMem] // Merge results of seek operations in ascending order.
haveMem = true ps.Seek(key, func(k, v []byte) {
iMem++ if done {
return
} }
kvPs := KeyValue{
// Merge results of seek operations in ascending order. Key: slice.Copy(k),
ps.Seek(key, func(k, v []byte) { Value: slice.Copy(v),
if done { }
return loop:
} for {
kvPs := KeyValue{ select {
Key: slice.Copy(k), case <-ctx.Done():
Value: slice.Copy(v), done = true
} break loop
loop: default:
for { var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0)
select { if isMem {
case <-ctx.Done():
done = true
break loop
default:
var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0)
if isMem {
if kvMem.Exists {
if cutPrefix {
kvMem.Key = kvMem.Key[len(key):]
}
seekres <- KeyValue{
Key: kvMem.Key,
Value: kvMem.Value,
}
}
if iMem < len(memRes) {
kvMem = memRes[iMem]
haveMem = true
iMem++
} else {
haveMem = false
}
} else {
if !bytes.Equal(kvMem.Key, kvPs.Key) {
if cutPrefix {
kvPs.Key = kvPs.Key[len(key):]
}
seekres <- kvPs
}
break loop
}
}
}
})
if !done && haveMem {
loop:
for i := iMem - 1; i < len(memRes); i++ {
select {
case <-ctx.Done():
break loop
default:
kvMem = memRes[i]
if kvMem.Exists { if kvMem.Exists {
if cutPrefix { if cutPrefix {
kvMem.Key = kvMem.Key[len(key):] kvMem.Key = kvMem.Key[len(key):]
} }
seekres <- KeyValue{ f(kvMem.Key, kvMem.Value)
Key: kvMem.Key,
Value: kvMem.Value,
}
} }
if iMem < len(memRes) {
kvMem = memRes[iMem]
haveMem = true
iMem++
} else {
haveMem = false
}
} else {
if !bytes.Equal(kvMem.Key, kvPs.Key) {
if cutPrefix {
kvPs.Key = kvPs.Key[len(key):]
}
f(kvPs.Key, kvPs.Value)
}
break loop
} }
} }
} }
close(seekres) })
}() if !done && haveMem {
loop:
return seekres for i := iMem - 1; i < len(memRes); i++ {
select {
case <-ctx.Done():
break loop
default:
kvMem = memRes[i]
if kvMem.Exists {
if cutPrefix {
kvMem.Key = kvMem.Key[len(key):]
}
f(kvMem.Key, kvMem.Value)
}
}
}
}
} }
// Persist flushes all the MemoryStore contents into the (supposedly) persistent // Persist flushes all the MemoryStore contents into the (supposedly) persistent