[#948] metabase: Use seek for faster cursor listing

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-11-10 13:06:43 +03:00 committed by Alex Vanin
parent 8d471c7e36
commit 0f6d8f6eea

View file

@ -1,8 +1,8 @@
package meta package meta
import ( import (
"errors"
"fmt" "fmt"
"strings"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
core "github.com/nspcc-dev/neofs-node/pkg/core/object" core "github.com/nspcc-dev/neofs-node/pkg/core/object"
@ -51,8 +51,6 @@ const (
cursorPrefixSG = 's' cursorPrefixSG = 's'
) )
var errStopIterator = errors.New("stop")
// ListWithCursor lists physical objects available in metabase. Includes regular, // ListWithCursor lists physical objects available in metabase. Includes regular,
// tombstone and storage group objects. Does not include inhumed objects. Use // tombstone and storage group objects. Does not include inhumed objects. Use
// cursor value from response for consecutive requests. // cursor value from response for consecutive requests.
@ -100,18 +98,21 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.
result := make([]*object.Address, 0, count) result := make([]*object.Address, 0, count)
unique := make(map[string]struct{}) // do not parse the same containerID twice unique := make(map[string]struct{}) // do not parse the same containerID twice
_ = tx.ForEach(func(name []byte, _ *bbolt.Bucket) error { c := tx.Cursor()
name, _ := c.First()
if !threshold {
name, _ = c.Seek([]byte(a.ContainerID().String()))
}
loop:
for ; name != nil; name, _ = c.Next() {
containerID := parseContainerID(name, unique) containerID := parseContainerID(name, unique)
if containerID == nil { if containerID == nil {
return nil continue
} }
unique[containerID.String()] = struct{}{} unique[containerID.String()] = struct{}{}
if !threshold && !containerID.Equal(a.ContainerID()) {
return nil // ignore buckets until we find cursor bucket
}
prefix := containerID.String() + "/" prefix := containerID.String() + "/"
lookupBuckets := [...]struct { lookupBuckets := [...]struct {
@ -131,15 +132,14 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.
cursorPrefix = lb.prefix cursorPrefix = lb.prefix
result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold) result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold)
if len(result) >= count { if len(result) >= count {
return errStopIterator break loop
} }
// set threshold flag after first `selectNFromBucket` invocation // set threshold flag after first `selectNFromBucket` invocation
// first invocation must look for cursor object // first invocation must look for cursor object
threshold = true threshold = true
} }
return nil }
})
if len(result) == 0 { if len(result) == 0 {
return nil, "", core.ErrEndOfListing return nil, "", core.ErrEndOfListing
@ -165,38 +165,35 @@ func selectNFromBucket(tx *bbolt.Tx,
count := len(to) count := len(to)
_ = bkt.ForEach(func(k, v []byte) error { c := bkt.Cursor()
k, _ := c.First()
if !threshold {
seekKey := strings.Replace(cursor, prefix, "", 1)
c.Seek([]byte(seekKey))
k, _ = c.Next() // we are looking for objects _after_ the cursor
}
for ; k != nil; k, _ = c.Next() {
if count >= limit { if count >= limit {
return errStopIterator break
} }
key := prefix + string(k) key := prefix + string(k)
if !threshold {
if cursor == key {
// ignore cursor object and start adding next objects
threshold = true
}
return nil
}
threshold = true
cursor = key cursor = key
a := object.NewAddress() a := object.NewAddress()
if err := a.Parse(key); err != nil { if err := a.Parse(key); err != nil {
return err break
} }
if inGraveyard(tx, a) > 0 { if inGraveyard(tx, a) > 0 {
return nil continue
} }
to = append(to, a) to = append(to, a)
count++ count++
}
return nil
})
return to, cursor return to, cursor
} }