core: avoid concurrent map r/w during SeekAsync

Close #2493.
This commit is contained in:
Anna Shaleva 2022-05-16 12:25:15 +03:00
parent 4b785d4ffb
commit d2dcdecca5

View file

@ -153,7 +153,8 @@ func (s *MemCachedStore) PutChangeSet(puts map[string][]byte, stores map[string]
// Seek implements the Store interface.
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.seek(context.Background(), rng, false, f)
ps, memRes := s.prepareSeekMemSnapshot(rng)
performSeek(context.Background(), ps, memRes, rng, false, f)
}
// GetStorageChanges returns all current storage changes. It can only be done for private
@ -170,8 +171,9 @@ func (s *MemCachedStore) GetStorageChanges() map[string][]byte {
// that key-value items are sorted by key in ascending way.
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
res := make(chan KeyValue)
ps, memRes := s.prepareSeekMemSnapshot(rng)
go func() {
s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool {
performSeek(ctx, ps, memRes, rng, cutPrefix, func(k, v []byte) bool {
select {
case <-ctx.Done():
return false
@ -185,13 +187,10 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
return res
}
// seek is internal representations of Seek* capable of seeking for the given key
// and supporting early stop using provided context. `cutPrefix` denotes whether provided
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
// and point to start seeking from. Backwards seeking from some point is supported
// with corresponding `rng` field set.
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
// Create memory store `mem` and `del` snapshot not to hold the lock.
// prepareSeekMemSnapshot prepares memory store snapshot of `stor`/`mem` in order
// not to hold the lock over MemCachedStore throughout the whole Seek operation.
// The results of prepareSeekMemSnapshot can be safely used as performSeek arguments.
func (s *MemCachedStore) prepareSeekMemSnapshot(rng SeekRange) (Store, []KeyValueExists) {
var memRes []KeyValueExists
sPrefix := string(rng.Prefix)
lPrefix := len(sPrefix)
@ -220,6 +219,19 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
}
ps := s.ps
s.runlock()
return ps, memRes
}
// performSeek is internal representations of Seek* capable of seeking for the given key
// and supporting early stop using provided context. `ps` is a captured underlying
// persistent storage. `memRes` is a snapshot of suitable cached items prepared
// by prepareSeekMemSnapshot.
//
// `cutPrefix` denotes whether provided key needs to be cut off the resulting keys.
// `rng` specifies prefix items must match and point to start seeking from. Backwards
// seeking from some point is supported with corresponding `rng` field set.
func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
lPrefix := len(string(rng.Prefix))
less := func(k1, k2 []byte) bool {
res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0)