From d2dcdecca595c0df01526ad79c5a6fa1146cb293 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 16 May 2022 12:25:15 +0300 Subject: [PATCH] core: avoid concurrent map r/w during SeekAsync Close #2493. --- pkg/core/storage/memcached_store.go | 30 ++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 10461545e..75e36dda5 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -153,7 +153,8 @@ func (s *MemCachedStore) PutChangeSet(puts map[string][]byte, stores map[string] // Seek implements the Store interface. func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) { - s.seek(context.Background(), rng, false, f) + ps, memRes := s.prepareSeekMemSnapshot(rng) + performSeek(context.Background(), ps, memRes, rng, false, f) } // GetStorageChanges returns all current storage changes. It can only be done for private @@ -170,8 +171,9 @@ func (s *MemCachedStore) GetStorageChanges() map[string][]byte { // that key-value items are sorted by key in ascending way. func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue { res := make(chan KeyValue) + ps, memRes := s.prepareSeekMemSnapshot(rng) go func() { - s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool { + performSeek(ctx, ps, memRes, rng, cutPrefix, func(k, v []byte) bool { select { case <-ctx.Done(): return false @@ -185,13 +187,10 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix return res } -// seek is internal representations of Seek* capable of seeking for the given key -// and supporting early stop using provided context. `cutPrefix` denotes whether provided -// key needs to be cut off the resulting keys. `rng` specifies prefix items must match -// and point to start seeking from. Backwards seeking from some point is supported -// with corresponding `rng` field set. -func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) { - // Create memory store `mem` and `del` snapshot not to hold the lock. +// prepareSeekMemSnapshot prepares memory store snapshot of `stor`/`mem` in order +// not to hold the lock over MemCachedStore throughout the whole Seek operation. +// The results of prepareSeekMemSnapshot can be safely used as performSeek arguments. +func (s *MemCachedStore) prepareSeekMemSnapshot(rng SeekRange) (Store, []KeyValueExists) { var memRes []KeyValueExists sPrefix := string(rng.Prefix) lPrefix := len(sPrefix) @@ -220,6 +219,19 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool } ps := s.ps s.runlock() + return ps, memRes +} + +// performSeek is internal representations of Seek* capable of seeking for the given key +// and supporting early stop using provided context. `ps` is a captured underlying +// persistent storage. `memRes` is a snapshot of suitable cached items prepared +// by prepareSeekMemSnapshot. +// +// `cutPrefix` denotes whether provided key needs to be cut off the resulting keys. +// `rng` specifies prefix items must match and point to start seeking from. Backwards +// seeking from some point is supported with corresponding `rng` field set. +func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) { + lPrefix := len(string(rng.Prefix)) less := func(k1, k2 []byte) bool { res := bytes.Compare(k1, k2) return res != 0 && rng.Backwards == (res > 0)