From 04a8e6666ff50722a07e172ef0b2ff2706ea05dc Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 28 Dec 2021 16:01:44 +0300 Subject: [PATCH] storage: allow to seek backwards --- pkg/core/storage/boltdb_store.go | 30 ++++- pkg/core/storage/leveldb_store.go | 21 ++- pkg/core/storage/memcached_store.go | 17 ++- pkg/core/storage/memory_store.go | 15 ++- pkg/core/storage/store.go | 4 + pkg/core/storage/storeandbatch_test.go | 173 ++++++++++++++++++------- 6 files changed, 203 insertions(+), 57 deletions(-) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 540bc84fd..c1ac2ed5e 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -113,7 +113,15 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) { start := make([]byte, len(rng.Prefix)+len(rng.Start)) copy(start, rng.Prefix) copy(start[len(rng.Prefix):], rng.Start) - prefix := util.BytesPrefix(rng.Prefix) + if rng.Backwards { + s.seekBackwards(rng.Prefix, start, f) + } else { + s.seek(rng.Prefix, start, f) + } +} + +func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) { + prefix := util.BytesPrefix(key) prefix.Start = start err := s.db.View(func(tx *bbolt.Tx) error { c := tx.Bucket(Bucket).Cursor() @@ -127,6 +135,26 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) { } } +func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) { + err := s.db.View(func(tx *bbolt.Tx) error { + c := tx.Bucket(Bucket).Cursor() + // Move cursor to the first kv pair which is followed by the pair matching the specified prefix. + if len(start) == 0 { + lastKey, _ := c.Last() + start = lastKey + } + rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit + c.Seek(rng.Limit) + for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() { + f(k, v) + } + return nil + }) + if err != nil { + panic(err) + } +} + // Batch implements the Batch interface and returns a boltdb // compatible Batch. func (s *BoltDBStore) Batch() Batch { diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 902041d50..87ab09923 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -89,7 +89,15 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) { start := make([]byte, len(rng.Prefix)+len(rng.Start)) copy(start, rng.Prefix) copy(start[len(rng.Prefix):], rng.Start) - prefix := util.BytesPrefix(rng.Prefix) + if rng.Backwards { + s.seekBackwards(rng.Prefix, start, f) + } else { + s.seek(rng.Prefix, start, f) + } +} + +func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte)) { + prefix := util.BytesPrefix(key) prefix.Start = start iter := s.db.NewIterator(prefix, nil) for iter.Next() { @@ -98,6 +106,17 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) { iter.Release() } +func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) { + iRange := util.BytesPrefix(start) + iRange.Start = key + + iter := s.db.NewIterator(iRange, nil) + for ok := iter.Last(); ok; ok = iter.Prev() { + f(iter.Key(), iter.Value()) + } + iter.Release() +} + // Batch implements the Batch interface and returns a leveldb // compatible Batch. func (s *LevelDBStore) Batch() Batch { diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index e9f2bd76d..7289e5f3c 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -115,7 +115,8 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix // seek is internal representations of Seek* capable of seeking for the given key // and supporting early stop using provided context. `cutPrefix` denotes whether provided // key needs to be cut off the resulting keys. `rng` specifies prefix items must match -// and point to start seeking from. +// and point to start seeking from. Backwards seeking from some point is supported +// with corresponding `rng` field set. func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) { // Create memory store `mem` and `del` snapshot not to hold the lock. var memRes []KeyValueExists @@ -126,6 +127,11 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool isKeyOK := func(key string) bool { return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0) } + if rng.Backwards { + isKeyOK = func(key string) bool { + return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0) + } + } s.mut.RLock() for k, v := range s.MemoryStore.mem { if isKeyOK(k) { @@ -149,9 +155,14 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool } ps := s.ps s.mut.RUnlock() + + less := func(k1, k2 []byte) bool { + res := bytes.Compare(k1, k2) + return res != 0 && rng.Backwards == (res > 0) + } // Sort memRes items for further comparison with ps items. sort.Slice(memRes, func(i, j int) bool { - return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0 + return less(memRes[i].Key, memRes[j].Key) }) var ( @@ -181,7 +192,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool done = true break loop default: - var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0) + var isMem = haveMem && less(kvMem.Key, kvPs.Key) if isMem { if kvMem.Exists { if cutPrefix { diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 2f0319b5d..edcb6eb30 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -128,7 +128,8 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { } // seek is an internal unlocked implementation of Seek. `start` denotes whether -// seeking starting from the provided prefix should be performed. +// seeking starting from the provided prefix should be performed. Backwards +// seeking from some point is supported with corresponding SeekRange field set. func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) { sPrefix := string(rng.Prefix) lPrefix := len(sPrefix) @@ -139,6 +140,16 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) { isKeyOK := func(key string) bool { return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0) } + if rng.Backwards { + isKeyOK = func(key string) bool { + return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0) + } + } + less := func(k1, k2 []byte) bool { + res := bytes.Compare(k1, k2) + return res != 0 && rng.Backwards == (res > 0) + } + for k, v := range s.mem { if isKeyOK(k) { memList = append(memList, KeyValue{ @@ -148,7 +159,7 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) { } } sort.Slice(memList, func(i, j int) bool { - return bytes.Compare(memList[i].Key, memList[j].Key) < 0 + return less(memList[i].Key, memList[j].Key) }) for _, kv := range memList { f(kv.Key, kv.Value) diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 6b85903bf..65acfadbb 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -59,6 +59,10 @@ type SeekRange struct { // Empty Prefix and empty Start can be combined, which means seeking // through all keys in the DB. Start []byte + // Backwards denotes whether Seek direction should be reversed, i.e. + // whether seeking should be performed in a descending way. + // Backwards can be safely combined with Prefix and Start. + Backwards bool } // ErrKeyNotFound is an error returned by Store implementations diff --git a/pkg/core/storage/storeandbatch_test.go b/pkg/core/storage/storeandbatch_test.go index 9423db635..273408ea7 100644 --- a/pkg/core/storage/storeandbatch_test.go +++ b/pkg/core/storage/storeandbatch_test.go @@ -81,17 +81,27 @@ func testStoreSeek(t *testing.T, s Store) { require.NoError(t, s.Put(v.Key, v.Value)) } - check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue) { - // Seek result expected to be sorted in an ascending way. - sort.Slice(goodkvs, func(i, j int) bool { + check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool) { + // Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way. + cmpFunc := func(i, j int) bool { return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0 - }) + } + if backwards { + cmpFunc = func(i, j int) bool { + return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) > 0 + } + } + sort.Slice(goodkvs, cmpFunc) - actual := make([]KeyValue, 0, len(goodkvs)) - s.Seek(SeekRange{ + rng := SeekRange{ Prefix: goodprefix, Start: start, - }, func(k, v []byte) { + } + if backwards { + rng.Backwards = true + } + actual := make([]KeyValue, 0, len(goodkvs)) + s.Seek(rng, func(k, v []byte) { actual = append(actual, KeyValue{ Key: slice.Copy(k), Value: slice.Copy(v), @@ -101,59 +111,117 @@ func testStoreSeek(t *testing.T, s Store) { } t.Run("non-empty prefix, empty start", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - // Given this prefix... - goodprefix := []byte("2") - // and empty start range... - start := []byte{} - // these pairs should be found. - goodkvs := []KeyValue{ - kvs[2], // key = "20" - kvs[3], // key = "21" - kvs[4], // key = "22" - } - check(t, goodprefix, start, goodkvs) + t.Run("forwards", func(t *testing.T) { + t.Run("good", func(t *testing.T) { + // Given this prefix... + goodprefix := []byte("2") + // and empty start range... + start := []byte{} + // these pairs should be found. + goodkvs := []KeyValue{ + kvs[2], // key = "20" + kvs[3], // key = "21" + kvs[4], // key = "22" + } + check(t, goodprefix, start, goodkvs, false) + }) + t.Run("no matching items", func(t *testing.T) { + goodprefix := []byte("0") + start := []byte{} + check(t, goodprefix, start, []KeyValue{}, false) + }) }) - t.Run("no matching items", func(t *testing.T) { - goodprefix := []byte("0") - start := []byte{} - check(t, goodprefix, start, []KeyValue{}) + + t.Run("backwards", func(t *testing.T) { + t.Run("good", func(t *testing.T) { + goodprefix := []byte("2") + start := []byte{} + goodkvs := []KeyValue{ + kvs[4], // key = "22" + kvs[3], // key = "21" + kvs[2], // key = "20" + } + check(t, goodprefix, start, goodkvs, true) + }) + t.Run("no matching items", func(t *testing.T) { + goodprefix := []byte("0") + start := []byte{} + check(t, goodprefix, start, []KeyValue{}, true) + }) }) }) t.Run("non-empty prefix, non-empty start", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - goodprefix := []byte("2") - start := []byte("1") // start will be upended to goodprefix to start seek from - goodkvs := []KeyValue{ - kvs[3], // key = "21" - kvs[4], // key = "22" - } - check(t, goodprefix, start, goodkvs) + t.Run("forwards", func(t *testing.T) { + t.Run("good", func(t *testing.T) { + goodprefix := []byte("2") + start := []byte("1") // start will be upended to goodprefix to start seek from + goodkvs := []KeyValue{ + kvs[3], // key = "21" + kvs[4], // key = "22" + } + check(t, goodprefix, start, goodkvs, false) + }) + t.Run("no matching items", func(t *testing.T) { + goodprefix := []byte("2") + start := []byte("3") // start is more than all keys prefixed by '2'. + check(t, goodprefix, start, []KeyValue{}, false) + }) }) - t.Run("no matching items", func(t *testing.T) { - goodprefix := []byte("2") - start := []byte("3") // start will be upended to goodprefix to start seek from - check(t, goodprefix, start, []KeyValue{}) + t.Run("backwards", func(t *testing.T) { + t.Run("good", func(t *testing.T) { + goodprefix := []byte("2") + start := []byte("1") // start will be upended to goodprefix to start seek from + goodkvs := []KeyValue{ + kvs[3], // key = "21" + kvs[2], // key = "20" + } + check(t, goodprefix, start, goodkvs, true) + }) + t.Run("no matching items", func(t *testing.T) { + goodprefix := []byte("2") + start := []byte(".") // start is less than all keys prefixed by '2'. + check(t, goodprefix, start, []KeyValue{}, true) + }) }) }) t.Run("empty prefix, non-empty start", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("21") - goodkvs := []KeyValue{ - kvs[3], // key = "21" - kvs[4], // key = "22" - kvs[5], // key = "30" - kvs[6], // key = "31" - } - check(t, goodprefix, start, goodkvs) + t.Run("forwards", func(t *testing.T) { + t.Run("good", func(t *testing.T) { + goodprefix := []byte{} + start := []byte("21") + goodkvs := []KeyValue{ + kvs[3], // key = "21" + kvs[4], // key = "22" + kvs[5], // key = "30" + kvs[6], // key = "31" + } + check(t, goodprefix, start, goodkvs, false) + }) + t.Run("no matching items", func(t *testing.T) { + goodprefix := []byte{} + start := []byte("32") // start is more than all keys. + check(t, goodprefix, start, []KeyValue{}, false) + }) }) - t.Run("no matching items", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("32") - check(t, goodprefix, start, []KeyValue{}) + t.Run("backwards", func(t *testing.T) { + t.Run("good", func(t *testing.T) { + goodprefix := []byte{} + start := []byte("21") + goodkvs := []KeyValue{ + kvs[3], // key = "21" + kvs[2], // key = "20" + kvs[1], // key = "11" + kvs[0], // key = "10" + } + check(t, goodprefix, start, goodkvs, true) + }) + t.Run("no matching items", func(t *testing.T) { + goodprefix := []byte{} + start := []byte("0") // start is less than all keys. + check(t, goodprefix, start, []KeyValue{}, true) + }) }) }) @@ -162,7 +230,12 @@ func testStoreSeek(t *testing.T, s Store) { start := []byte{} goodkvs := make([]KeyValue, len(kvs)) copy(goodkvs, kvs) - check(t, goodprefix, start, goodkvs) + t.Run("forwards", func(t *testing.T) { + check(t, goodprefix, start, goodkvs, false) + }) + t.Run("backwards", func(t *testing.T) { + check(t, goodprefix, start, goodkvs, true) + }) }) require.NoError(t, s.Close())