From dcda7bec63d555ad26ba8b9bf73a9610e9006fb8 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 19 Oct 2021 15:10:12 +0300 Subject: [PATCH] core: squash PS-seeking and merging routines in (*MemCachedStore).Seek We don't need a separate routine to merge seek results. --- pkg/core/storage/memcached_store.go | 116 +++++++++++++++------------- 1 file changed, 64 insertions(+), 52 deletions(-) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 5f3c168f6..ea137db8e 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -132,75 +132,87 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0 }) - var ( - data2 = make(chan KeyValue) - seekres = make(chan KeyValue) - ) + var seekres = make(chan KeyValue) // Seek over persistent store. - go func() { - var done bool - ps.Seek(key, func(k, v []byte) { - if done { - return - } - select { - case <-ctx.Done(): - done = true - default: - // Must copy here, #1468. - data2 <- KeyValue{ - Key: slice.Copy(k), - Value: slice.Copy(v), - } - } - }) - close(data2) - }() - - // Merge results of seek operations in ascending order. go func() { var ( + done bool + iMem int kvMem KeyValueExists haveMem bool - iMem int ) if iMem < len(memRes) { kvMem = memRes[iMem] haveMem = true iMem++ } - kvPs, havePs := <-data2 - for { - if !haveMem && !havePs { - break + + // Merge results of seek operations in ascending order. + ps.Seek(key, func(k, v []byte) { + if done { + return } - var isMem = haveMem && (!havePs || (bytes.Compare(kvMem.Key, kvPs.Key) < 0)) - if isMem { - if kvMem.Exists { - if cutPrefix { - kvMem.Key = kvMem.Key[len(key):] - } - seekres <- KeyValue{ - Key: kvMem.Key, - Value: kvMem.Value, + kvPs := KeyValue{ + Key: slice.Copy(k), + Value: slice.Copy(v), + } + loop: + for { + select { + case <-ctx.Done(): + done = true + break loop + default: + var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0) + if isMem { + if kvMem.Exists { + if cutPrefix { + kvMem.Key = kvMem.Key[len(key):] + } + seekres <- KeyValue{ + Key: kvMem.Key, + Value: kvMem.Value, + } + } + if iMem < len(memRes) { + kvMem = memRes[iMem] + haveMem = true + iMem++ + } else { + haveMem = false + } + } else { + if !bytes.Equal(kvMem.Key, kvPs.Key) { + if cutPrefix { + kvPs.Key = kvPs.Key[len(key):] + } + seekres <- kvPs + } + break loop } } - if iMem < len(memRes) { - kvMem = memRes[iMem] - haveMem = true - iMem++ - } else { - haveMem = false - } - } else { - if !bytes.Equal(kvMem.Key, kvPs.Key) { - if cutPrefix { - kvPs.Key = kvPs.Key[len(key):] + } + + }) + if !done && haveMem { + loop: + for i := iMem - 1; i < len(memRes); i++ { + select { + case <-ctx.Done(): + break loop + default: + kvMem = memRes[i] + if kvMem.Exists { + if cutPrefix { + kvMem.Key = kvMem.Key[len(key):] + } + seekres <- KeyValue{ + Key: kvMem.Key, + Value: kvMem.Value, + } } - seekres <- kvPs } - kvPs, havePs = <-data2 } } close(seekres)