[#948] metabase: Add ListWithCursor method
ListWithCursor allows listing physically stored objects from metabase with small chunks. Cursor tracks last processed object, therefore new chunks are returned on each request. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
60e3ea978f
commit
1a829a521f
2 changed files with 381 additions and 0 deletions
208
pkg/local_object_storage/metabase/list.go
Normal file
208
pkg/local_object_storage/metabase/list.go
Normal file
|
@ -0,0 +1,208 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// ListPrm contains parameters for ListWithCursor operation.
|
||||
type ListPrm struct {
|
||||
count int
|
||||
cursor string
|
||||
}
|
||||
|
||||
// WithCount sets maximum amount of addresses that ListWithCursor can return.
|
||||
func (l *ListPrm) WithCount(count uint32) *ListPrm {
|
||||
l.count = int(count)
|
||||
return l
|
||||
}
|
||||
|
||||
// WithCursor sets cursor for ListWithCursor operation. For initial request
|
||||
// ignore this param or use empty string. For continues requests, use value
|
||||
// from ListRes.
|
||||
func (l *ListPrm) WithCursor(cursor string) *ListPrm {
|
||||
l.cursor = cursor
|
||||
return l
|
||||
}
|
||||
|
||||
// ListRes contains values returned from ListWithCursor operation.
|
||||
type ListRes struct {
|
||||
addrList []*object.Address
|
||||
cursor string
|
||||
}
|
||||
|
||||
// AddressList returns addresses selected by ListWithCursor operation.
|
||||
func (l ListRes) AddressList() []*object.Address {
|
||||
return l.addrList
|
||||
}
|
||||
|
||||
// Cursor returns cursor for consecutive listing requests.
|
||||
func (l ListRes) Cursor() string {
|
||||
return l.cursor
|
||||
}
|
||||
|
||||
const (
|
||||
cursorPrefixPrimary = 'p'
|
||||
cursorPrefixTombstone = 't'
|
||||
cursorPrefixSG = 's'
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrEndOfListing returns from ListWithCursor when metabase can't return
|
||||
// any more objects after provided cursor.
|
||||
// Use empty cursor to start listing again.
|
||||
ErrEndOfListing = errors.New("end of metabase records")
|
||||
|
||||
errStopIterator = errors.New("stop")
|
||||
)
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase. Includes regular,
|
||||
// tombstone and storage group objects. Does not include inhumed objects. Use
|
||||
// cursor value from response for consecutive requests.
|
||||
func ListWithCursor(db *DB, count uint32, cursor string) ([]*object.Address, string, error) {
|
||||
r, err := db.ListWithCursor(new(ListPrm).WithCount(count).WithCursor(cursor))
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return r.AddressList(), r.Cursor(), nil
|
||||
}
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase. Includes regular,
|
||||
// tombstone and storage group objects. Does not include inhumed objects. Use
|
||||
// cursor value from response for consecutive requests.
|
||||
func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) {
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res = new(ListRes)
|
||||
res.addrList, res.cursor, err = db.listWithCursor(tx, prm.count, prm.cursor)
|
||||
return err
|
||||
})
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.Address, string, error) {
|
||||
threshold := len(cursor) == 0 // threshold is a flag to ignore cursor
|
||||
a := object.NewAddress()
|
||||
var cursorPrefix uint8
|
||||
|
||||
if !threshold { // if cursor is present, then decode it and check sanity
|
||||
cursorPrefix = cursor[0]
|
||||
switch cursorPrefix {
|
||||
case cursorPrefixPrimary, cursorPrefixSG, cursorPrefixTombstone:
|
||||
default:
|
||||
return nil, "", fmt.Errorf("invalid cursor prefix %s", string(cursorPrefix))
|
||||
}
|
||||
|
||||
cursor = cursor[1:]
|
||||
if err := a.Parse(cursor); err != nil {
|
||||
return nil, "", fmt.Errorf("invalid cursor address: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]*object.Address, 0, count)
|
||||
unique := make(map[string]struct{}) // do not parse the same containerID twice
|
||||
|
||||
_ = tx.ForEach(func(name []byte, _ *bbolt.Bucket) error {
|
||||
containerID := parseContainerID(name, unique)
|
||||
if containerID == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
unique[containerID.String()] = struct{}{}
|
||||
|
||||
if !threshold && !containerID.Equal(a.ContainerID()) {
|
||||
return nil // ignore buckets until we find cursor bucket
|
||||
}
|
||||
|
||||
prefix := containerID.String() + "/"
|
||||
|
||||
lookupBuckets := [...]struct {
|
||||
name []byte
|
||||
prefix uint8
|
||||
}{
|
||||
{primaryBucketName(containerID), cursorPrefixPrimary},
|
||||
{tombstoneBucketName(containerID), cursorPrefixTombstone},
|
||||
{storageGroupBucketName(containerID), cursorPrefixSG},
|
||||
}
|
||||
|
||||
for _, lb := range lookupBuckets {
|
||||
if !threshold && cursorPrefix != lb.prefix {
|
||||
continue // start from the bucket, specified in the cursor prefix
|
||||
}
|
||||
|
||||
cursorPrefix = lb.prefix
|
||||
result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold)
|
||||
if len(result) >= count {
|
||||
return errStopIterator
|
||||
}
|
||||
|
||||
// set threshold flag after first `selectNFromBucket` invocation
|
||||
// first invocation must look for cursor object
|
||||
threshold = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if len(result) == 0 {
|
||||
return nil, "", ErrEndOfListing
|
||||
}
|
||||
|
||||
return result, string(cursorPrefix) + cursor, nil
|
||||
}
|
||||
|
||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||
// object to start selecting from. Ignores inhumed objects.
|
||||
func selectNFromBucket(tx *bbolt.Tx,
|
||||
name []byte, // bucket name
|
||||
prefix string, // string of CID, optimization
|
||||
to []*object.Address, // listing result
|
||||
limit int, // stop listing at `limit` items in result
|
||||
cursor string, // start from cursor object
|
||||
threshold bool, // ignore cursor and start immediately
|
||||
) ([]*object.Address, string) {
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt == nil {
|
||||
return to, cursor
|
||||
}
|
||||
|
||||
count := len(to)
|
||||
|
||||
_ = bkt.ForEach(func(k, v []byte) error {
|
||||
if count >= limit {
|
||||
return errStopIterator
|
||||
}
|
||||
|
||||
key := prefix + string(k)
|
||||
|
||||
if !threshold {
|
||||
if cursor == key {
|
||||
// ignore cursor object and start adding next objects
|
||||
threshold = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
threshold = true
|
||||
cursor = key
|
||||
|
||||
a := object.NewAddress()
|
||||
if err := a.Parse(key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if inGraveyard(tx, a) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
to = append(to, a)
|
||||
count++
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return to, cursor
|
||||
}
|
173
pkg/local_object_storage/metabase/list_test.go
Normal file
173
pkg/local_object_storage/metabase/list_test.go
Normal file
|
@ -0,0 +1,173 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLisObjectsWithCursor(t *testing.T) {
|
||||
db := newDB(t)
|
||||
|
||||
const (
|
||||
containers = 5
|
||||
total = containers * 4 // regular + ts + sg + child
|
||||
)
|
||||
|
||||
expected := make([]*objectSDK.Address, 0, total)
|
||||
|
||||
// fill metabase with objects
|
||||
for i := 0; i < containers; i++ {
|
||||
containerID := cidtest.Generate()
|
||||
|
||||
// add one regular object
|
||||
obj := generateRawObjectWithCID(t, containerID)
|
||||
obj.SetType(objectSDK.TypeRegular)
|
||||
err := putBig(db, obj.Object())
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, obj.Object().Address())
|
||||
|
||||
// add one tombstone
|
||||
obj = generateRawObjectWithCID(t, containerID)
|
||||
obj.SetType(objectSDK.TypeTombstone)
|
||||
err = putBig(db, obj.Object())
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, obj.Object().Address())
|
||||
|
||||
// add one storage group
|
||||
obj = generateRawObjectWithCID(t, containerID)
|
||||
obj.SetType(objectSDK.TypeStorageGroup)
|
||||
err = putBig(db, obj.Object())
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, obj.Object().Address())
|
||||
|
||||
// add one inhumed (do not include into expected)
|
||||
obj = generateRawObjectWithCID(t, containerID)
|
||||
obj.SetType(objectSDK.TypeRegular)
|
||||
err = putBig(db, obj.Object())
|
||||
require.NoError(t, err)
|
||||
ts := generateRawObjectWithCID(t, containerID)
|
||||
err = meta.Inhume(db, obj.Object().Address(), ts.Object().Address())
|
||||
require.NoError(t, err)
|
||||
|
||||
// add one child object (do not include parent into expected)
|
||||
splitID := objectSDK.NewSplitID()
|
||||
parent := generateRawObjectWithCID(t, containerID)
|
||||
addAttribute(parent, "foo", "bar")
|
||||
child := generateRawObjectWithCID(t, containerID)
|
||||
child.SetParent(parent.Object().SDK())
|
||||
child.SetParentID(parent.ID())
|
||||
child.SetSplitID(splitID)
|
||||
err = putBig(db, child.Object())
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, child.Object().Address())
|
||||
}
|
||||
|
||||
expected = sortAddresses(expected)
|
||||
|
||||
t.Run("success with various count", func(t *testing.T) {
|
||||
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
||||
got := make([]*objectSDK.Address, 0, total)
|
||||
|
||||
res, cursor, err := meta.ListWithCursor(db, uint32(countPerReq), "")
|
||||
require.NoError(t, err, "count:%d", countPerReq)
|
||||
got = append(got, res...)
|
||||
|
||||
expectedIterations := total / countPerReq
|
||||
if total%countPerReq == 0 { // remove initial list if aligned
|
||||
expectedIterations--
|
||||
}
|
||||
|
||||
for i := 0; i < expectedIterations; i++ {
|
||||
res, cursor, err = meta.ListWithCursor(db, uint32(countPerReq), cursor)
|
||||
require.NoError(t, err, "count:%d", countPerReq)
|
||||
got = append(got, res...)
|
||||
}
|
||||
|
||||
_, _, err = meta.ListWithCursor(db, uint32(countPerReq), cursor)
|
||||
require.ErrorIs(t, err, meta.ErrEndOfListing, "count:%d", countPerReq, cursor)
|
||||
|
||||
got = sortAddresses(got)
|
||||
require.Equal(t, expected, got, "count:%d", countPerReq)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("invalid cursor", func(t *testing.T) {
|
||||
_, cursor, err := meta.ListWithCursor(db, total/2, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = meta.ListWithCursor(db, total/2, "x"+cursor[1:])
|
||||
require.Error(t, err)
|
||||
|
||||
_, _, err = meta.ListWithCursor(db, total/2, cursor[1:])
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid count", func(t *testing.T) {
|
||||
_, _, err := meta.ListWithCursor(db, 0, "")
|
||||
require.ErrorIs(t, err, meta.ErrEndOfListing)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
||||
db := newDB(t)
|
||||
|
||||
const total = 5
|
||||
|
||||
expected := make(map[string]int, total)
|
||||
|
||||
// fill metabase with objects
|
||||
for i := 0; i < total; i++ {
|
||||
obj := generateRawObject(t)
|
||||
err := putBig(db, obj.Object())
|
||||
require.NoError(t, err)
|
||||
expected[obj.Object().Address().String()] = 0
|
||||
}
|
||||
|
||||
// get half of the objects
|
||||
got, cursor, err := meta.ListWithCursor(db, total/2, "")
|
||||
require.NoError(t, err)
|
||||
for _, obj := range got {
|
||||
if _, ok := expected[obj.String()]; ok {
|
||||
expected[obj.String()]++
|
||||
}
|
||||
}
|
||||
|
||||
// add new objects
|
||||
for i := 0; i < total; i++ {
|
||||
obj := generateRawObject(t)
|
||||
err = putBig(db, obj.Object())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// get remaining objects
|
||||
for {
|
||||
got, cursor, err = meta.ListWithCursor(db, total, cursor)
|
||||
if errors.Is(err, meta.ErrEndOfListing) {
|
||||
break
|
||||
}
|
||||
for _, obj := range got {
|
||||
if _, ok := expected[obj.String()]; ok {
|
||||
expected[obj.String()]++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if all expected objects were fetched after database update
|
||||
for _, v := range expected {
|
||||
require.Equal(t, 1, v)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func sortAddresses(addr []*objectSDK.Address) []*objectSDK.Address {
|
||||
sort.Slice(addr, func(i, j int) bool {
|
||||
return addr[i].String() < addr[j].String()
|
||||
})
|
||||
return addr
|
||||
}
|
Loading…
Reference in a new issue