[#948] engine: Define cursor for object listing as a type

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-11-11 17:27:11 +03:00 committed by Alex Vanin
parent 164cd10af8
commit aa9ce8a853
4 changed files with 71 additions and 122 deletions

View file

@ -1,19 +1,23 @@
package engine package engine
import ( import (
"fmt"
"sort" "sort"
"strconv"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
core "github.com/nspcc-dev/neofs-node/pkg/core/object" core "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
) )
// Cursor is a type for continuous object listing.
type Cursor struct {
shardID string
shardCursor *shard.Cursor
}
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct { type ListWithCursorPrm struct {
count uint32 count uint32
cursor string cursor *Cursor
} }
// WithCount sets maximum amount of addresses that ListWithCursor can return. // WithCount sets maximum amount of addresses that ListWithCursor can return.
@ -23,9 +27,9 @@ func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm {
} }
// WithCursor sets cursor for ListWithCursor operation. For initial request // WithCursor sets cursor for ListWithCursor operation. For initial request
// ignore this param or use empty string. For continues requests, use value // ignore this param or use nil value. For continues requests, use value
// from ListWithCursorRes. // from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm { func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) *ListWithCursorPrm {
p.cursor = cursor p.cursor = cursor
return p return p
} }
@ -33,7 +37,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm {
// ListWithCursorRes contains values returned from ListWithCursor operation. // ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct { type ListWithCursorRes struct {
addrList []*object.Address addrList []*object.Address
cursor string cursor *Cursor
} }
// AddressList returns addresses selected by ListWithCursor operation. // AddressList returns addresses selected by ListWithCursor operation.
@ -42,7 +46,7 @@ func (l ListWithCursorRes) AddressList() []*object.Address {
} }
// Cursor returns cursor for consecutive listing requests. // Cursor returns cursor for consecutive listing requests.
func (l ListWithCursorRes) Cursor() string { func (l ListWithCursorRes) Cursor() *Cursor {
return l.cursor return l.cursor
} }
@ -51,10 +55,7 @@ func (l ListWithCursorRes) Cursor() string {
// Does not include inhumed objects. Use cursor value from response // Does not include inhumed objects. Use cursor value from response
// for consecutive requests. // for consecutive requests.
func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) { func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
var ( result := make([]*object.Address, 0, prm.count)
err error
result = make([]*object.Address, 0, prm.count)
)
// 1. Get available shards and sort them. // 1. Get available shards and sort them.
e.mtx.RLock() e.mtx.RLock()
@ -72,14 +73,10 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR
return shardIDs[i] < shardIDs[j] return shardIDs[i] < shardIDs[j]
}) })
// 2. Decode shard ID from cursor. // 2. Prepare cursor object.
cursor := prm.cursor cursor := prm.cursor
cursorShardID := shardIDs[0] if cursor == nil {
if len(cursor) > 0 { cursor = &Cursor{shardID: shardIDs[0]}
cursorShardID, cursor, err = decodeID(cursor)
if err != nil {
return nil, err
}
} }
// 3. Iterate over available shards. Skip unavailable shards. // 3. Iterate over available shards. Skip unavailable shards.
@ -88,7 +85,7 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR
break break
} }
if shardIDs[i] < cursorShardID { if shardIDs[i] < cursor.shardID {
continue continue
} }
@ -101,8 +98,8 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR
count := uint32(int(prm.count) - len(result)) count := uint32(int(prm.count) - len(result))
shardPrm := new(shard.ListWithCursorPrm).WithCount(count) shardPrm := new(shard.ListWithCursorPrm).WithCount(count)
if shardIDs[i] == cursorShardID { if shardIDs[i] == cursor.shardID {
shardPrm.WithCursor(cursor) shardPrm.WithCursor(cursor.shardCursor)
} }
res, err := shardInstance.ListWithCursor(shardPrm) res, err := shardInstance.ListWithCursor(shardPrm)
@ -111,8 +108,8 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR
} }
result = append(result, res.AddressList()...) result = append(result, res.AddressList()...)
cursor = res.Cursor() cursor.shardCursor = res.Cursor()
cursorShardID = shardIDs[i] cursor.shardID = shardIDs[i]
} }
if len(result) == 0 { if len(result) == 0 {
@ -121,29 +118,6 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR
return &ListWithCursorRes{ return &ListWithCursorRes{
addrList: result, addrList: result,
cursor: encodeID(cursorShardID, cursor), cursor: cursor,
}, nil }, nil
} }
func decodeID(cursor string) (shardID string, shardCursor string, err error) {
ln := len(cursor)
if ln < 2 {
return "", "", fmt.Errorf("invalid cursor %s", cursor)
}
idLen, err := strconv.Atoi(cursor[:2])
if err != nil {
return "", "", fmt.Errorf("invalid cursor %s", cursor)
}
if len(cursor) < 2+idLen {
return "", "", fmt.Errorf("invalid cursor %s", cursor)
}
return cursor[2 : 2+idLen], cursor[2+idLen:], nil
}
func encodeID(id, cursor string) string {
prefix := fmt.Sprintf("%02d", len(id))
return prefix + id + cursor
}

View file

@ -1,18 +1,21 @@
package meta package meta
import ( import (
"fmt"
"strings"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
core "github.com/nspcc-dev/neofs-node/pkg/core/object" core "github.com/nspcc-dev/neofs-node/pkg/core/object"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
// Cursor is a type for continuous object listing.
type Cursor struct {
bucket uint8
address *object.Address
}
// ListPrm contains parameters for ListWithCursor operation. // ListPrm contains parameters for ListWithCursor operation.
type ListPrm struct { type ListPrm struct {
count int count int
cursor string cursor *Cursor
} }
// WithCount sets maximum amount of addresses that ListWithCursor can return. // WithCount sets maximum amount of addresses that ListWithCursor can return.
@ -22,9 +25,9 @@ func (l *ListPrm) WithCount(count uint32) *ListPrm {
} }
// WithCursor sets cursor for ListWithCursor operation. For initial request // WithCursor sets cursor for ListWithCursor operation. For initial request
// ignore this param or use empty string. For continues requests, use value // ignore this param or use nil value. For continues requests, use value
// from ListRes. // from ListRes.
func (l *ListPrm) WithCursor(cursor string) *ListPrm { func (l *ListPrm) WithCursor(cursor *Cursor) *ListPrm {
l.cursor = cursor l.cursor = cursor
return l return l
} }
@ -32,7 +35,7 @@ func (l *ListPrm) WithCursor(cursor string) *ListPrm {
// ListRes contains values returned from ListWithCursor operation. // ListRes contains values returned from ListWithCursor operation.
type ListRes struct { type ListRes struct {
addrList []*object.Address addrList []*object.Address
cursor string cursor *Cursor
} }
// AddressList returns addresses selected by ListWithCursor operation. // AddressList returns addresses selected by ListWithCursor operation.
@ -41,23 +44,23 @@ func (l ListRes) AddressList() []*object.Address {
} }
// Cursor returns cursor for consecutive listing requests. // Cursor returns cursor for consecutive listing requests.
func (l ListRes) Cursor() string { func (l ListRes) Cursor() *Cursor {
return l.cursor return l.cursor
} }
const ( const (
cursorPrefixPrimary = 'p' cursorBucketPrimary = iota
cursorPrefixTombstone = 't' cursorBucketTombstone
cursorPrefixSG = 's' cursorBucketSG
) )
// ListWithCursor lists physical objects available in metabase. Includes regular, // ListWithCursor lists physical objects available in metabase. Includes regular,
// tombstone and storage group objects. Does not include inhumed objects. Use // tombstone and storage group objects. Does not include inhumed objects. Use
// cursor value from response for consecutive requests. // cursor value from response for consecutive requests.
func ListWithCursor(db *DB, count uint32, cursor string) ([]*object.Address, string, error) { func ListWithCursor(db *DB, count uint32, cursor *Cursor) ([]*object.Address, *Cursor, error) {
r, err := db.ListWithCursor(new(ListPrm).WithCount(count).WithCursor(cursor)) r, err := db.ListWithCursor(new(ListPrm).WithCount(count).WithCursor(cursor))
if err != nil { if err != nil {
return nil, "", err return nil, nil, err
} }
return r.AddressList(), r.Cursor(), nil return r.AddressList(), r.Cursor(), nil
@ -76,25 +79,8 @@ func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) {
return res, err return res, err
} }
func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.Address, string, error) { func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor *Cursor) ([]*object.Address, *Cursor, error) {
threshold := len(cursor) == 0 // threshold is a flag to ignore cursor threshold := cursor == nil // threshold is a flag to ignore cursor
a := object.NewAddress()
var cursorPrefix uint8
if !threshold { // if cursor is present, then decode it and check sanity
cursorPrefix = cursor[0]
switch cursorPrefix {
case cursorPrefixPrimary, cursorPrefixSG, cursorPrefixTombstone:
default:
return nil, "", fmt.Errorf("invalid cursor prefix %s", string(cursorPrefix))
}
cursor = cursor[1:]
if err := a.Parse(cursor); err != nil {
return nil, "", fmt.Errorf("invalid cursor address: %w", err)
}
}
result := make([]*object.Address, 0, count) result := make([]*object.Address, 0, count)
unique := make(map[string]struct{}) // do not parse the same containerID twice unique := make(map[string]struct{}) // do not parse the same containerID twice
@ -102,7 +88,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.
name, _ := c.First() name, _ := c.First()
if !threshold { if !threshold {
name, _ = c.Seek([]byte(a.ContainerID().String())) name, _ = c.Seek([]byte(cursor.address.ContainerID().String()))
} }
loop: loop:
@ -117,20 +103,20 @@ loop:
lookupBuckets := [...]struct { lookupBuckets := [...]struct {
name []byte name []byte
prefix uint8 bucket uint8
}{ }{
{primaryBucketName(containerID), cursorPrefixPrimary}, {primaryBucketName(containerID), cursorBucketPrimary},
{tombstoneBucketName(containerID), cursorPrefixTombstone}, {tombstoneBucketName(containerID), cursorBucketTombstone},
{storageGroupBucketName(containerID), cursorPrefixSG}, {storageGroupBucketName(containerID), cursorBucketSG},
} }
for _, lb := range lookupBuckets { for _, lb := range lookupBuckets {
if !threshold && cursorPrefix != lb.prefix { if !threshold && cursor.bucket != lb.bucket {
continue // start from the bucket, specified in the cursor prefix continue // start from the bucket, specified in the cursor bucket
} }
cursorPrefix = lb.prefix
result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold) result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold)
cursor.bucket = lb.bucket
if len(result) >= count { if len(result) >= count {
break loop break loop
} }
@ -142,10 +128,10 @@ loop:
} }
if len(result) == 0 { if len(result) == 0 {
return nil, "", core.ErrEndOfListing return nil, nil, core.ErrEndOfListing
} }
return result, string(cursorPrefix) + cursor, nil return result, cursor, nil
} }
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find // selectNFromBucket similar to selectAllFromBucket but uses cursor to find
@ -155,22 +141,24 @@ func selectNFromBucket(tx *bbolt.Tx,
prefix string, // string of CID, optimization prefix string, // string of CID, optimization
to []*object.Address, // listing result to []*object.Address, // listing result
limit int, // stop listing at `limit` items in result limit int, // stop listing at `limit` items in result
cursor string, // start from cursor object cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately threshold bool, // ignore cursor and start immediately
) ([]*object.Address, string) { ) ([]*object.Address, *Cursor) {
bkt := tx.Bucket(name) bkt := tx.Bucket(name)
if bkt == nil { if bkt == nil {
return to, cursor return to, cursor
} }
count := len(to) if cursor == nil {
cursor = new(Cursor)
}
count := len(to)
c := bkt.Cursor() c := bkt.Cursor()
k, _ := c.First() k, _ := c.First()
if !threshold { if !threshold {
seekKey := strings.Replace(cursor, prefix, "", 1) c.Seek([]byte(cursor.address.ObjectID().String()))
c.Seek([]byte(seekKey))
k, _ = c.Next() // we are looking for objects _after_ the cursor k, _ = c.Next() // we are looking for objects _after_ the cursor
} }
@ -178,19 +166,14 @@ func selectNFromBucket(tx *bbolt.Tx,
if count >= limit { if count >= limit {
break break
} }
key := prefix + string(k)
cursor = key
a := object.NewAddress() a := object.NewAddress()
if err := a.Parse(key); err != nil { if err := a.Parse(prefix + string(k)); err != nil {
break break
} }
cursor.address = a
if inGraveyard(tx, a) > 0 { if inGraveyard(tx, a) > 0 {
continue continue
} }
to = append(to, a) to = append(to, a)
count++ count++
} }

View file

@ -75,7 +75,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
for countPerReq := 1; countPerReq <= total; countPerReq++ { for countPerReq := 1; countPerReq <= total; countPerReq++ {
got := make([]*objectSDK.Address, 0, total) got := make([]*objectSDK.Address, 0, total)
res, cursor, err := meta.ListWithCursor(db, uint32(countPerReq), "") res, cursor, err := meta.ListWithCursor(db, uint32(countPerReq), nil)
require.NoError(t, err, "count:%d", countPerReq) require.NoError(t, err, "count:%d", countPerReq)
got = append(got, res...) got = append(got, res...)
@ -98,19 +98,8 @@ func TestLisObjectsWithCursor(t *testing.T) {
} }
}) })
t.Run("invalid cursor", func(t *testing.T) {
_, cursor, err := meta.ListWithCursor(db, total/2, "")
require.NoError(t, err)
_, _, err = meta.ListWithCursor(db, total/2, "x"+cursor[1:])
require.Error(t, err)
_, _, err = meta.ListWithCursor(db, total/2, cursor[1:])
require.Error(t, err)
})
t.Run("invalid count", func(t *testing.T) { t.Run("invalid count", func(t *testing.T) {
_, _, err := meta.ListWithCursor(db, 0, "") _, _, err := meta.ListWithCursor(db, 0, nil)
require.ErrorIs(t, err, core.ErrEndOfListing) require.ErrorIs(t, err, core.ErrEndOfListing)
}) })
} }
@ -131,7 +120,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
} }
// get half of the objects // get half of the objects
got, cursor, err := meta.ListWithCursor(db, total/2, "") got, cursor, err := meta.ListWithCursor(db, total/2, nil)
require.NoError(t, err) require.NoError(t, err)
for _, obj := range got { for _, obj := range got {
if _, ok := expected[obj.String()]; ok { if _, ok := expected[obj.String()]; ok {

View file

@ -9,6 +9,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// Cursor is a type for continuous object listing.
type Cursor = meta.Cursor
type ListContainersPrm struct{} type ListContainersPrm struct{}
type ListContainersRes struct { type ListContainersRes struct {
@ -22,13 +25,13 @@ func (r *ListContainersRes) Containers() []*cid.ID {
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct { type ListWithCursorPrm struct {
count uint32 count uint32
cursor string cursor *Cursor
} }
// ListWithCursorRes contains values returned from ListWithCursor operation. // ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct { type ListWithCursorRes struct {
addrList []*object.Address addrList []*object.Address
cursor string cursor *Cursor
} }
// WithCount sets maximum amount of addresses that ListWithCursor can return. // WithCount sets maximum amount of addresses that ListWithCursor can return.
@ -38,9 +41,9 @@ func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm {
} }
// WithCursor sets cursor for ListWithCursor operation. For initial request, // WithCursor sets cursor for ListWithCursor operation. For initial request,
// ignore this param or use empty string. For continues requests, use value // ignore this param or use nil value. For continues requests, use value
// from ListWithCursorRes. // from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm { func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) *ListWithCursorPrm {
p.cursor = cursor p.cursor = cursor
return p return p
} }
@ -51,7 +54,7 @@ func (r ListWithCursorRes) AddressList() []*object.Address {
} }
// Cursor returns cursor for consecutive listing requests. // Cursor returns cursor for consecutive listing requests.
func (r ListWithCursorRes) Cursor() string { func (r ListWithCursorRes) Cursor() *Cursor {
return r.cursor return r.cursor
} }
@ -121,11 +124,11 @@ func (s *Shard) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, erro
// ListWithCursor lists physical objects available in shard starting from // ListWithCursor lists physical objects available in shard starting from
// cursor. Includes regular, tombstone and storage group objects. Does not // cursor. Includes regular, tombstone and storage group objects. Does not
// include inhumed objects. Use cursor value from response for consecutive requests. // include inhumed objects. Use cursor value from response for consecutive requests.
func ListWithCursor(s *Shard, count uint32, cursor string) ([]*object.Address, string, error) { func ListWithCursor(s *Shard, count uint32, cursor *Cursor) ([]*object.Address, *Cursor, error) {
prm := new(ListWithCursorPrm).WithCount(count).WithCursor(cursor) prm := new(ListWithCursorPrm).WithCount(count).WithCursor(cursor)
res, err := s.ListWithCursor(prm) res, err := s.ListWithCursor(prm)
if err != nil { if err != nil {
return nil, "", err return nil, nil, err
} }
return res.AddressList(), res.Cursor(), nil return res.AddressList(), res.Cursor(), nil