storage: allow to seek backwards

This commit is contained in:
Anna Shaleva 2021-12-28 16:01:44 +03:00
parent 6bc92abe19
commit 04a8e6666f
6 changed files with 203 additions and 57 deletions

View file

@ -113,7 +113,15 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
start := make([]byte, len(rng.Prefix)+len(rng.Start)) start := make([]byte, len(rng.Prefix)+len(rng.Start))
copy(start, rng.Prefix) copy(start, rng.Prefix)
copy(start[len(rng.Prefix):], rng.Start) copy(start[len(rng.Prefix):], rng.Start)
prefix := util.BytesPrefix(rng.Prefix) if rng.Backwards {
s.seekBackwards(rng.Prefix, start, f)
} else {
s.seek(rng.Prefix, start, f)
}
}
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
prefix := util.BytesPrefix(key)
prefix.Start = start prefix.Start = start
err := s.db.View(func(tx *bbolt.Tx) error { err := s.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(Bucket).Cursor() c := tx.Bucket(Bucket).Cursor()
@ -127,6 +135,26 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
} }
} }
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
err := s.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(Bucket).Cursor()
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
if len(start) == 0 {
lastKey, _ := c.Last()
start = lastKey
}
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
c.Seek(rng.Limit)
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
f(k, v)
}
return nil
})
if err != nil {
panic(err)
}
}
// Batch implements the Batch interface and returns a boltdb // Batch implements the Batch interface and returns a boltdb
// compatible Batch. // compatible Batch.
func (s *BoltDBStore) Batch() Batch { func (s *BoltDBStore) Batch() Batch {

View file

@ -89,7 +89,15 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
start := make([]byte, len(rng.Prefix)+len(rng.Start)) start := make([]byte, len(rng.Prefix)+len(rng.Start))
copy(start, rng.Prefix) copy(start, rng.Prefix)
copy(start[len(rng.Prefix):], rng.Start) copy(start[len(rng.Prefix):], rng.Start)
prefix := util.BytesPrefix(rng.Prefix) if rng.Backwards {
s.seekBackwards(rng.Prefix, start, f)
} else {
s.seek(rng.Prefix, start, f)
}
}
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
prefix := util.BytesPrefix(key)
prefix.Start = start prefix.Start = start
iter := s.db.NewIterator(prefix, nil) iter := s.db.NewIterator(prefix, nil)
for iter.Next() { for iter.Next() {
@ -98,6 +106,17 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
iter.Release() iter.Release()
} }
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
iRange := util.BytesPrefix(start)
iRange.Start = key
iter := s.db.NewIterator(iRange, nil)
for ok := iter.Last(); ok; ok = iter.Prev() {
f(iter.Key(), iter.Value())
}
iter.Release()
}
// Batch implements the Batch interface and returns a leveldb // Batch implements the Batch interface and returns a leveldb
// compatible Batch. // compatible Batch.
func (s *LevelDBStore) Batch() Batch { func (s *LevelDBStore) Batch() Batch {

View file

@ -115,7 +115,8 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
// seek is internal representations of Seek* capable of seeking for the given key // seek is internal representations of Seek* capable of seeking for the given key
// and supporting early stop using provided context. `cutPrefix` denotes whether provided // and supporting early stop using provided context. `cutPrefix` denotes whether provided
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match // key needs to be cut off the resulting keys. `rng` specifies prefix items must match
// and point to start seeking from. // and point to start seeking from. Backwards seeking from some point is supported
// with corresponding `rng` field set.
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) { func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) {
// Create memory store `mem` and `del` snapshot not to hold the lock. // Create memory store `mem` and `del` snapshot not to hold the lock.
var memRes []KeyValueExists var memRes []KeyValueExists
@ -126,6 +127,11 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
isKeyOK := func(key string) bool { isKeyOK := func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0) return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
} }
if rng.Backwards {
isKeyOK = func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
}
}
s.mut.RLock() s.mut.RLock()
for k, v := range s.MemoryStore.mem { for k, v := range s.MemoryStore.mem {
if isKeyOK(k) { if isKeyOK(k) {
@ -149,9 +155,14 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
} }
ps := s.ps ps := s.ps
s.mut.RUnlock() s.mut.RUnlock()
less := func(k1, k2 []byte) bool {
res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0)
}
// Sort memRes items for further comparison with ps items. // Sort memRes items for further comparison with ps items.
sort.Slice(memRes, func(i, j int) bool { sort.Slice(memRes, func(i, j int) bool {
return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0 return less(memRes[i].Key, memRes[j].Key)
}) })
var ( var (
@ -181,7 +192,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
done = true done = true
break loop break loop
default: default:
var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0) var isMem = haveMem && less(kvMem.Key, kvPs.Key)
if isMem { if isMem {
if kvMem.Exists { if kvMem.Exists {
if cutPrefix { if cutPrefix {

View file

@ -128,7 +128,8 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
} }
// seek is an internal unlocked implementation of Seek. `start` denotes whether // seek is an internal unlocked implementation of Seek. `start` denotes whether
// seeking starting from the provided prefix should be performed. // seeking starting from the provided prefix should be performed. Backwards
// seeking from some point is supported with corresponding SeekRange field set.
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) { func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
sPrefix := string(rng.Prefix) sPrefix := string(rng.Prefix)
lPrefix := len(sPrefix) lPrefix := len(sPrefix)
@ -139,6 +140,16 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
isKeyOK := func(key string) bool { isKeyOK := func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0) return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
} }
if rng.Backwards {
isKeyOK = func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
}
}
less := func(k1, k2 []byte) bool {
res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0)
}
for k, v := range s.mem { for k, v := range s.mem {
if isKeyOK(k) { if isKeyOK(k) {
memList = append(memList, KeyValue{ memList = append(memList, KeyValue{
@ -148,7 +159,7 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
} }
} }
sort.Slice(memList, func(i, j int) bool { sort.Slice(memList, func(i, j int) bool {
return bytes.Compare(memList[i].Key, memList[j].Key) < 0 return less(memList[i].Key, memList[j].Key)
}) })
for _, kv := range memList { for _, kv := range memList {
f(kv.Key, kv.Value) f(kv.Key, kv.Value)

View file

@ -59,6 +59,10 @@ type SeekRange struct {
// Empty Prefix and empty Start can be combined, which means seeking // Empty Prefix and empty Start can be combined, which means seeking
// through all keys in the DB. // through all keys in the DB.
Start []byte Start []byte
// Backwards denotes whether Seek direction should be reversed, i.e.
// whether seeking should be performed in a descending way.
// Backwards can be safely combined with Prefix and Start.
Backwards bool
} }
// ErrKeyNotFound is an error returned by Store implementations // ErrKeyNotFound is an error returned by Store implementations

View file

@ -81,17 +81,27 @@ func testStoreSeek(t *testing.T, s Store) {
require.NoError(t, s.Put(v.Key, v.Value)) require.NoError(t, s.Put(v.Key, v.Value))
} }
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue) { check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool) {
// Seek result expected to be sorted in an ascending way. // Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
sort.Slice(goodkvs, func(i, j int) bool { cmpFunc := func(i, j int) bool {
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0 return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
}) }
if backwards {
cmpFunc = func(i, j int) bool {
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) > 0
}
}
sort.Slice(goodkvs, cmpFunc)
actual := make([]KeyValue, 0, len(goodkvs)) rng := SeekRange{
s.Seek(SeekRange{
Prefix: goodprefix, Prefix: goodprefix,
Start: start, Start: start,
}, func(k, v []byte) { }
if backwards {
rng.Backwards = true
}
actual := make([]KeyValue, 0, len(goodkvs))
s.Seek(rng, func(k, v []byte) {
actual = append(actual, KeyValue{ actual = append(actual, KeyValue{
Key: slice.Copy(k), Key: slice.Copy(k),
Value: slice.Copy(v), Value: slice.Copy(v),
@ -101,59 +111,117 @@ func testStoreSeek(t *testing.T, s Store) {
} }
t.Run("non-empty prefix, empty start", func(t *testing.T) { t.Run("non-empty prefix, empty start", func(t *testing.T) {
t.Run("good", func(t *testing.T) { t.Run("forwards", func(t *testing.T) {
// Given this prefix... t.Run("good", func(t *testing.T) {
goodprefix := []byte("2") // Given this prefix...
// and empty start range... goodprefix := []byte("2")
start := []byte{} // and empty start range...
// these pairs should be found. start := []byte{}
goodkvs := []KeyValue{ // these pairs should be found.
kvs[2], // key = "20" goodkvs := []KeyValue{
kvs[3], // key = "21" kvs[2], // key = "20"
kvs[4], // key = "22" kvs[3], // key = "21"
} kvs[4], // key = "22"
check(t, goodprefix, start, goodkvs) }
check(t, goodprefix, start, goodkvs, false)
})
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte("0")
start := []byte{}
check(t, goodprefix, start, []KeyValue{}, false)
})
}) })
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte("0") t.Run("backwards", func(t *testing.T) {
start := []byte{} t.Run("good", func(t *testing.T) {
check(t, goodprefix, start, []KeyValue{}) goodprefix := []byte("2")
start := []byte{}
goodkvs := []KeyValue{
kvs[4], // key = "22"
kvs[3], // key = "21"
kvs[2], // key = "20"
}
check(t, goodprefix, start, goodkvs, true)
})
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte("0")
start := []byte{}
check(t, goodprefix, start, []KeyValue{}, true)
})
}) })
}) })
t.Run("non-empty prefix, non-empty start", func(t *testing.T) { t.Run("non-empty prefix, non-empty start", func(t *testing.T) {
t.Run("good", func(t *testing.T) { t.Run("forwards", func(t *testing.T) {
goodprefix := []byte("2") t.Run("good", func(t *testing.T) {
start := []byte("1") // start will be upended to goodprefix to start seek from goodprefix := []byte("2")
goodkvs := []KeyValue{ start := []byte("1") // start will be upended to goodprefix to start seek from
kvs[3], // key = "21" goodkvs := []KeyValue{
kvs[4], // key = "22" kvs[3], // key = "21"
} kvs[4], // key = "22"
check(t, goodprefix, start, goodkvs) }
check(t, goodprefix, start, goodkvs, false)
})
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte("2")
start := []byte("3") // start is more than all keys prefixed by '2'.
check(t, goodprefix, start, []KeyValue{}, false)
})
}) })
t.Run("no matching items", func(t *testing.T) { t.Run("backwards", func(t *testing.T) {
goodprefix := []byte("2") t.Run("good", func(t *testing.T) {
start := []byte("3") // start will be upended to goodprefix to start seek from goodprefix := []byte("2")
check(t, goodprefix, start, []KeyValue{}) start := []byte("1") // start will be upended to goodprefix to start seek from
goodkvs := []KeyValue{
kvs[3], // key = "21"
kvs[2], // key = "20"
}
check(t, goodprefix, start, goodkvs, true)
})
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte("2")
start := []byte(".") // start is less than all keys prefixed by '2'.
check(t, goodprefix, start, []KeyValue{}, true)
})
}) })
}) })
t.Run("empty prefix, non-empty start", func(t *testing.T) { t.Run("empty prefix, non-empty start", func(t *testing.T) {
t.Run("good", func(t *testing.T) { t.Run("forwards", func(t *testing.T) {
goodprefix := []byte{} t.Run("good", func(t *testing.T) {
start := []byte("21") goodprefix := []byte{}
goodkvs := []KeyValue{ start := []byte("21")
kvs[3], // key = "21" goodkvs := []KeyValue{
kvs[4], // key = "22" kvs[3], // key = "21"
kvs[5], // key = "30" kvs[4], // key = "22"
kvs[6], // key = "31" kvs[5], // key = "30"
} kvs[6], // key = "31"
check(t, goodprefix, start, goodkvs) }
check(t, goodprefix, start, goodkvs, false)
})
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte{}
start := []byte("32") // start is more than all keys.
check(t, goodprefix, start, []KeyValue{}, false)
})
}) })
t.Run("no matching items", func(t *testing.T) { t.Run("backwards", func(t *testing.T) {
goodprefix := []byte{} t.Run("good", func(t *testing.T) {
start := []byte("32") goodprefix := []byte{}
check(t, goodprefix, start, []KeyValue{}) start := []byte("21")
goodkvs := []KeyValue{
kvs[3], // key = "21"
kvs[2], // key = "20"
kvs[1], // key = "11"
kvs[0], // key = "10"
}
check(t, goodprefix, start, goodkvs, true)
})
t.Run("no matching items", func(t *testing.T) {
goodprefix := []byte{}
start := []byte("0") // start is less than all keys.
check(t, goodprefix, start, []KeyValue{}, true)
})
}) })
}) })
@ -162,7 +230,12 @@ func testStoreSeek(t *testing.T, s Store) {
start := []byte{} start := []byte{}
goodkvs := make([]KeyValue, len(kvs)) goodkvs := make([]KeyValue, len(kvs))
copy(goodkvs, kvs) copy(goodkvs, kvs)
check(t, goodprefix, start, goodkvs) t.Run("forwards", func(t *testing.T) {
check(t, goodprefix, start, goodkvs, false)
})
t.Run("backwards", func(t *testing.T) {
check(t, goodprefix, start, goodkvs, true)
})
}) })
require.NoError(t, s.Close()) require.NoError(t, s.Close())