diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index ef1f1e583..5f3c168f6 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -102,7 +102,7 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { // that key-value items are sorted by key in ascending way. func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue { // Create memory store `mem` and `del` snapshot not to hold the lock. - memRes := make([]KeyValueExists, 0) + var memRes []KeyValueExists sk := string(key) s.mut.RLock() for k, v := range s.MemoryStore.mem { @@ -110,7 +110,7 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo memRes = append(memRes, KeyValueExists{ KeyValue: KeyValue{ Key: []byte(k), - Value: slice.Copy(v), + Value: v, }, Exists: true, }) @@ -133,23 +133,9 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo }) var ( - data1 = make(chan KeyValueExists) data2 = make(chan KeyValue) seekres = make(chan KeyValue) ) - // Seek over memory store. - go func() { - loop: - for _, kv := range memRes { - select { - case <-ctx.Done(): - break loop - default: - data1 <- kv - } - } - close(data1) - }() // Seek over persistent store. go func() { @@ -174,7 +160,16 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo // Merge results of seek operations in ascending order. go func() { - kvMem, haveMem := <-data1 + var ( + kvMem KeyValueExists + haveMem bool + iMem int + ) + if iMem < len(memRes) { + kvMem = memRes[iMem] + haveMem = true + iMem++ + } kvPs, havePs := <-data2 for { if !haveMem && !havePs { @@ -191,7 +186,13 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo Value: kvMem.Value, } } - kvMem, haveMem = <-data1 + if iMem < len(memRes) { + kvMem = memRes[iMem] + haveMem = true + iMem++ + } else { + haveMem = false + } } else { if !bytes.Equal(kvMem.Key, kvPs.Key) { if cutPrefix {