diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index bc3b90cbf..b15a36e97 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -338,10 +338,13 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S // Seek executes f for all items with a given prefix. // If key is to be used outside of f, they may not be copied. func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { - res := dao.SeekAsync(context.Background(), id, prefix) - for r := range res { - f(r.Key, r.Value) + lookupKey := makeStorageItemKey(id, nil) + if prefix != nil { + lookupKey = append(lookupKey, prefix...) } + dao.Store.Seek(lookupKey, func(k, v []byte) { + f(k[len(lookupKey):], v) + }) } // SeekAsync sends all storage items matching given prefix to a channel and returns diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index ea137db8e..afe32481b 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -91,16 +91,28 @@ func (s *MemCachedStore) GetBatch() *MemBatch { // Seek implements the Store interface. func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { - seekres := s.SeekAsync(context.Background(), key, false) - for kv := range seekres { - f(kv.Key, kv.Value) - } + s.seek(context.Background(), key, false, f) } // SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and // value slices may not be copied and may be modified. SeekAsync can guarantee // that key-value items are sorted by key in ascending way. func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue { + res := make(chan KeyValue) + go func() { + s.seek(ctx, key, cutPrefix, func(k, v []byte) { + res <- KeyValue{ + Key: k, + Value: v, + } + }) + close(res) + }() + + return res +} + +func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f func(k, v []byte)) { // Create memory store `mem` and `del` snapshot not to hold the lock. var memRes []KeyValueExists sk := string(key) @@ -132,93 +144,77 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0 }) - var seekres = make(chan KeyValue) - - // Seek over persistent store. - go func() { - var ( - done bool - iMem int - kvMem KeyValueExists - haveMem bool - ) - if iMem < len(memRes) { - kvMem = memRes[iMem] - haveMem = true - iMem++ + var ( + done bool + iMem int + kvMem KeyValueExists + haveMem bool + ) + if iMem < len(memRes) { + kvMem = memRes[iMem] + haveMem = true + iMem++ + } + // Merge results of seek operations in ascending order. + ps.Seek(key, func(k, v []byte) { + if done { + return } - - // Merge results of seek operations in ascending order. - ps.Seek(key, func(k, v []byte) { - if done { - return - } - kvPs := KeyValue{ - Key: slice.Copy(k), - Value: slice.Copy(v), - } - loop: - for { - select { - case <-ctx.Done(): - done = true - break loop - default: - var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0) - if isMem { - if kvMem.Exists { - if cutPrefix { - kvMem.Key = kvMem.Key[len(key):] - } - seekres <- KeyValue{ - Key: kvMem.Key, - Value: kvMem.Value, - } - } - if iMem < len(memRes) { - kvMem = memRes[iMem] - haveMem = true - iMem++ - } else { - haveMem = false - } - } else { - if !bytes.Equal(kvMem.Key, kvPs.Key) { - if cutPrefix { - kvPs.Key = kvPs.Key[len(key):] - } - seekres <- kvPs - } - break loop - } - } - } - - }) - if !done && haveMem { - loop: - for i := iMem - 1; i < len(memRes); i++ { - select { - case <-ctx.Done(): - break loop - default: - kvMem = memRes[i] + kvPs := KeyValue{ + Key: slice.Copy(k), + Value: slice.Copy(v), + } + loop: + for { + select { + case <-ctx.Done(): + done = true + break loop + default: + var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0) + if isMem { if kvMem.Exists { if cutPrefix { kvMem.Key = kvMem.Key[len(key):] } - seekres <- KeyValue{ - Key: kvMem.Key, - Value: kvMem.Value, - } + f(kvMem.Key, kvMem.Value) } + if iMem < len(memRes) { + kvMem = memRes[iMem] + haveMem = true + iMem++ + } else { + haveMem = false + } + } else { + if !bytes.Equal(kvMem.Key, kvPs.Key) { + if cutPrefix { + kvPs.Key = kvPs.Key[len(key):] + } + f(kvPs.Key, kvPs.Value) + } + break loop } } } - close(seekres) - }() - - return seekres + }) + if !done && haveMem { + loop: + for i := iMem - 1; i < len(memRes); i++ { + select { + case <-ctx.Done(): + break loop + default: + kvMem = memRes[i] + if kvMem.Exists { + if cutPrefix { + kvMem.Key = kvMem.Key[len(key):] + } + f(kvMem.Key, kvMem.Value) + } + } + } + } } // Persist flushes all the MemoryStore contents into the (supposedly) persistent