[#948] metabase: Simplify cursor structure

Use bbolt bucket keys to seek offset in root bucket
and container buckets.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-11-17 14:01:41 +03:00 committed by Alex Vanin
parent 468caa83d9
commit c80c83b0b8

View file

@ -1,15 +1,18 @@
package meta
import (
"strings"
core "github.com/nspcc-dev/neofs-node/pkg/core/object"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.etcd.io/bbolt"
)
// Cursor is a type for continuous object listing.
type Cursor struct {
bucket uint8
address *object.Address
bucketName []byte
inBucketOffset []byte
}
// ListPrm contains parameters for ListWithCursor operation.
@ -48,12 +51,6 @@ func (l ListRes) Cursor() *Cursor {
return l.cursor
}
const (
cursorBucketPrimary = iota
cursorBucketTombstone
cursorBucketSG
)
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes regular, tombstone and storage group objects. Does not
// include inhumed objects. Use cursor value from response for consecutive requests.
@ -88,55 +85,50 @@ func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) {
func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor *Cursor) ([]*object.Address, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
result := make([]*object.Address, 0, count)
unique := make(map[string]struct{}) // do not parse the same containerID twice
var bucketName []byte
c := tx.Cursor()
name, _ := c.First()
if !threshold {
name, _ = c.Seek([]byte(cursor.address.ContainerID().String()))
name, _ = c.Seek(cursor.bucketName)
}
loop:
for ; name != nil; name, _ = c.Next() {
containerID := parseContainerID(name, unique)
containerID, postfix := parseContainerIDWithPostfix(name)
if containerID == nil {
continue
}
unique[containerID.String()] = struct{}{}
switch postfix {
case "", storageGroupPostfix, tombstonePostfix:
default:
continue
}
prefix := containerID.String() + "/"
lookupBuckets := [...]struct {
name []byte
bucket uint8
}{
{primaryBucketName(containerID), cursorBucketPrimary},
{tombstoneBucketName(containerID), cursorBucketTombstone},
{storageGroupBucketName(containerID), cursorBucketSG},
result, cursor = selectNFromBucket(tx, name, prefix, result, count, cursor, threshold)
bucketName = name
if len(result) >= count {
break loop
}
for _, lb := range lookupBuckets {
if !threshold && cursor.bucket != lb.bucket {
continue // start from the bucket, specified in the cursor bucket
}
result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold)
cursor.bucket = lb.bucket
if len(result) >= count {
break loop
}
// set threshold flag after first `selectNFromBucket` invocation
// first invocation must look for cursor object
threshold = true
}
// set threshold flag after first `selectNFromBucket` invocation
// first invocation must look for cursor object
threshold = true
}
if len(result) == 0 {
return nil, nil, core.ErrEndOfListing
}
// new slice is much faster but less memory efficient
// we need to copy, because bucketName exists during bbolt tx
cursor.bucketName = make([]byte, len(bucketName))
copy(cursor.bucketName, bucketName)
return result, cursor, nil
}
@ -163,8 +155,10 @@ func selectNFromBucket(tx *bbolt.Tx,
c := bkt.Cursor()
k, _ := c.First()
offset := cursor.inBucketOffset
if !threshold {
c.Seek([]byte(cursor.address.ObjectID().String()))
c.Seek(offset)
k, _ = c.Next() // we are looking for objects _after_ the cursor
}
@ -176,7 +170,7 @@ func selectNFromBucket(tx *bbolt.Tx,
if err := a.Parse(prefix + string(k)); err != nil {
break
}
cursor.address = a
offset = k
if inGraveyard(tx, a) > 0 {
continue
}
@ -184,5 +178,30 @@ func selectNFromBucket(tx *bbolt.Tx,
count++
}
// new slice is much faster but less memory efficient
// we need to copy, because offset exists during bbolt tx
cursor.inBucketOffset = make([]byte, len(offset))
copy(cursor.inBucketOffset, offset)
return to, cursor
}
func parseContainerIDWithPostfix(name []byte) (*cid.ID, string) {
var (
containerID = cid.New()
containerIDStr = string(name)
postfix string
)
ind := strings.Index(string(name), invalidBase58String)
if ind > 0 {
postfix = containerIDStr[ind:]
containerIDStr = containerIDStr[:ind]
}
if err := containerID.Parse(containerIDStr); err != nil {
return nil, ""
}
return containerID, postfix
}