Merge pull request #2495 from nspcc-dev/fix-2493

core: avoid concurrent map r/w during SeekAsync
This commit is contained in:
Roman Khimov 2022-05-16 13:24:52 +03:00 committed by GitHub
commit 81fa751000
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -153,7 +153,8 @@ func (s *MemCachedStore) PutChangeSet(puts map[string][]byte, stores map[string]
// Seek implements the Store interface. // Seek implements the Store interface.
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) { func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.seek(context.Background(), rng, false, f) ps, memRes := s.prepareSeekMemSnapshot(rng)
performSeek(context.Background(), ps, memRes, rng, false, f)
} }
// GetStorageChanges returns all current storage changes. It can only be done for private // GetStorageChanges returns all current storage changes. It can only be done for private
@ -170,8 +171,9 @@ func (s *MemCachedStore) GetStorageChanges() map[string][]byte {
// that key-value items are sorted by key in ascending way. // that key-value items are sorted by key in ascending way.
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue { func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
res := make(chan KeyValue) res := make(chan KeyValue)
ps, memRes := s.prepareSeekMemSnapshot(rng)
go func() { go func() {
s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool { performSeek(ctx, ps, memRes, rng, cutPrefix, func(k, v []byte) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return false return false
@ -185,13 +187,10 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
return res return res
} }
// seek is internal representations of Seek* capable of seeking for the given key // prepareSeekMemSnapshot prepares memory store snapshot of `stor`/`mem` in order
// and supporting early stop using provided context. `cutPrefix` denotes whether provided // not to hold the lock over MemCachedStore throughout the whole Seek operation.
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match // The results of prepareSeekMemSnapshot can be safely used as performSeek arguments.
// and point to start seeking from. Backwards seeking from some point is supported func (s *MemCachedStore) prepareSeekMemSnapshot(rng SeekRange) (Store, []KeyValueExists) {
// with corresponding `rng` field set.
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
// Create memory store `mem` and `del` snapshot not to hold the lock.
var memRes []KeyValueExists var memRes []KeyValueExists
sPrefix := string(rng.Prefix) sPrefix := string(rng.Prefix)
lPrefix := len(sPrefix) lPrefix := len(sPrefix)
@ -220,6 +219,19 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
} }
ps := s.ps ps := s.ps
s.runlock() s.runlock()
return ps, memRes
}
// performSeek is internal representations of Seek* capable of seeking for the given key
// and supporting early stop using provided context. `ps` is a captured underlying
// persistent storage. `memRes` is a snapshot of suitable cached items prepared
// by prepareSeekMemSnapshot.
//
// `cutPrefix` denotes whether provided key needs to be cut off the resulting keys.
// `rng` specifies prefix items must match and point to start seeking from. Backwards
// seeking from some point is supported with corresponding `rng` field set.
func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
lPrefix := len(string(rng.Prefix))
less := func(k1, k2 []byte) bool { less := func(k1, k2 []byte) bool {
res := bytes.Compare(k1, k2) res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0) return res != 0 && rng.Backwards == (res > 0)