From 1a829a521f68f7e434ee3ed278fd2dae74d5b6e5 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 27 Oct 2021 16:14:51 +0300 Subject: [PATCH] [#948] metabase: Add ListWithCursor method ListWithCursor allows listing physically stored objects from metabase with small chunks. Cursor tracks last processed object, therefore new chunks are returned on each request. Signed-off-by: Alex Vanin --- pkg/local_object_storage/metabase/list.go | 208 ++++++++++++++++++ .../metabase/list_test.go | 173 +++++++++++++++ 2 files changed, 381 insertions(+) create mode 100644 pkg/local_object_storage/metabase/list.go create mode 100644 pkg/local_object_storage/metabase/list_test.go diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go new file mode 100644 index 000000000..709d55ab3 --- /dev/null +++ b/pkg/local_object_storage/metabase/list.go @@ -0,0 +1,208 @@ +package meta + +import ( + "errors" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.etcd.io/bbolt" +) + +// ListPrm contains parameters for ListWithCursor operation. +type ListPrm struct { + count int + cursor string +} + +// WithCount sets maximum amount of addresses that ListWithCursor can return. +func (l *ListPrm) WithCount(count uint32) *ListPrm { + l.count = int(count) + return l +} + +// WithCursor sets cursor for ListWithCursor operation. For initial request +// ignore this param or use empty string. For continues requests, use value +// from ListRes. +func (l *ListPrm) WithCursor(cursor string) *ListPrm { + l.cursor = cursor + return l +} + +// ListRes contains values returned from ListWithCursor operation. +type ListRes struct { + addrList []*object.Address + cursor string +} + +// AddressList returns addresses selected by ListWithCursor operation. +func (l ListRes) AddressList() []*object.Address { + return l.addrList +} + +// Cursor returns cursor for consecutive listing requests. +func (l ListRes) Cursor() string { + return l.cursor +} + +const ( + cursorPrefixPrimary = 'p' + cursorPrefixTombstone = 't' + cursorPrefixSG = 's' +) + +var ( + // ErrEndOfListing returns from ListWithCursor when metabase can't return + // any more objects after provided cursor. + // Use empty cursor to start listing again. + ErrEndOfListing = errors.New("end of metabase records") + + errStopIterator = errors.New("stop") +) + +// ListWithCursor lists physical objects available in metabase. Includes regular, +// tombstone and storage group objects. Does not include inhumed objects. Use +// cursor value from response for consecutive requests. +func ListWithCursor(db *DB, count uint32, cursor string) ([]*object.Address, string, error) { + r, err := db.ListWithCursor(new(ListPrm).WithCount(count).WithCursor(cursor)) + if err != nil { + return nil, "", err + } + + return r.AddressList(), r.Cursor(), nil +} + +// ListWithCursor lists physical objects available in metabase. Includes regular, +// tombstone and storage group objects. Does not include inhumed objects. Use +// cursor value from response for consecutive requests. +func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + res = new(ListRes) + res.addrList, res.cursor, err = db.listWithCursor(tx, prm.count, prm.cursor) + return err + }) + + return res, err +} + +func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.Address, string, error) { + threshold := len(cursor) == 0 // threshold is a flag to ignore cursor + a := object.NewAddress() + var cursorPrefix uint8 + + if !threshold { // if cursor is present, then decode it and check sanity + cursorPrefix = cursor[0] + switch cursorPrefix { + case cursorPrefixPrimary, cursorPrefixSG, cursorPrefixTombstone: + default: + return nil, "", fmt.Errorf("invalid cursor prefix %s", string(cursorPrefix)) + } + + cursor = cursor[1:] + if err := a.Parse(cursor); err != nil { + return nil, "", fmt.Errorf("invalid cursor address: %w", err) + } + } + + result := make([]*object.Address, 0, count) + unique := make(map[string]struct{}) // do not parse the same containerID twice + + _ = tx.ForEach(func(name []byte, _ *bbolt.Bucket) error { + containerID := parseContainerID(name, unique) + if containerID == nil { + return nil + } + + unique[containerID.String()] = struct{}{} + + if !threshold && !containerID.Equal(a.ContainerID()) { + return nil // ignore buckets until we find cursor bucket + } + + prefix := containerID.String() + "/" + + lookupBuckets := [...]struct { + name []byte + prefix uint8 + }{ + {primaryBucketName(containerID), cursorPrefixPrimary}, + {tombstoneBucketName(containerID), cursorPrefixTombstone}, + {storageGroupBucketName(containerID), cursorPrefixSG}, + } + + for _, lb := range lookupBuckets { + if !threshold && cursorPrefix != lb.prefix { + continue // start from the bucket, specified in the cursor prefix + } + + cursorPrefix = lb.prefix + result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold) + if len(result) >= count { + return errStopIterator + } + + // set threshold flag after first `selectNFromBucket` invocation + // first invocation must look for cursor object + threshold = true + } + return nil + }) + + if len(result) == 0 { + return nil, "", ErrEndOfListing + } + + return result, string(cursorPrefix) + cursor, nil +} + +// selectNFromBucket similar to selectAllFromBucket but uses cursor to find +// object to start selecting from. Ignores inhumed objects. +func selectNFromBucket(tx *bbolt.Tx, + name []byte, // bucket name + prefix string, // string of CID, optimization + to []*object.Address, // listing result + limit int, // stop listing at `limit` items in result + cursor string, // start from cursor object + threshold bool, // ignore cursor and start immediately +) ([]*object.Address, string) { + bkt := tx.Bucket(name) + if bkt == nil { + return to, cursor + } + + count := len(to) + + _ = bkt.ForEach(func(k, v []byte) error { + if count >= limit { + return errStopIterator + } + + key := prefix + string(k) + + if !threshold { + if cursor == key { + // ignore cursor object and start adding next objects + threshold = true + } + return nil + } + + threshold = true + cursor = key + + a := object.NewAddress() + if err := a.Parse(key); err != nil { + return err + } + + if inGraveyard(tx, a) > 0 { + return nil + } + + to = append(to, a) + count++ + + return nil + }) + + return to, cursor +} diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go new file mode 100644 index 000000000..88e30746a --- /dev/null +++ b/pkg/local_object_storage/metabase/list_test.go @@ -0,0 +1,173 @@ +package meta_test + +import ( + "errors" + "sort" + "testing" + + cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/stretchr/testify/require" +) + +func TestLisObjectsWithCursor(t *testing.T) { + db := newDB(t) + + const ( + containers = 5 + total = containers * 4 // regular + ts + sg + child + ) + + expected := make([]*objectSDK.Address, 0, total) + + // fill metabase with objects + for i := 0; i < containers; i++ { + containerID := cidtest.Generate() + + // add one regular object + obj := generateRawObjectWithCID(t, containerID) + obj.SetType(objectSDK.TypeRegular) + err := putBig(db, obj.Object()) + require.NoError(t, err) + expected = append(expected, obj.Object().Address()) + + // add one tombstone + obj = generateRawObjectWithCID(t, containerID) + obj.SetType(objectSDK.TypeTombstone) + err = putBig(db, obj.Object()) + require.NoError(t, err) + expected = append(expected, obj.Object().Address()) + + // add one storage group + obj = generateRawObjectWithCID(t, containerID) + obj.SetType(objectSDK.TypeStorageGroup) + err = putBig(db, obj.Object()) + require.NoError(t, err) + expected = append(expected, obj.Object().Address()) + + // add one inhumed (do not include into expected) + obj = generateRawObjectWithCID(t, containerID) + obj.SetType(objectSDK.TypeRegular) + err = putBig(db, obj.Object()) + require.NoError(t, err) + ts := generateRawObjectWithCID(t, containerID) + err = meta.Inhume(db, obj.Object().Address(), ts.Object().Address()) + require.NoError(t, err) + + // add one child object (do not include parent into expected) + splitID := objectSDK.NewSplitID() + parent := generateRawObjectWithCID(t, containerID) + addAttribute(parent, "foo", "bar") + child := generateRawObjectWithCID(t, containerID) + child.SetParent(parent.Object().SDK()) + child.SetParentID(parent.ID()) + child.SetSplitID(splitID) + err = putBig(db, child.Object()) + require.NoError(t, err) + expected = append(expected, child.Object().Address()) + } + + expected = sortAddresses(expected) + + t.Run("success with various count", func(t *testing.T) { + for countPerReq := 1; countPerReq <= total; countPerReq++ { + got := make([]*objectSDK.Address, 0, total) + + res, cursor, err := meta.ListWithCursor(db, uint32(countPerReq), "") + require.NoError(t, err, "count:%d", countPerReq) + got = append(got, res...) + + expectedIterations := total / countPerReq + if total%countPerReq == 0 { // remove initial list if aligned + expectedIterations-- + } + + for i := 0; i < expectedIterations; i++ { + res, cursor, err = meta.ListWithCursor(db, uint32(countPerReq), cursor) + require.NoError(t, err, "count:%d", countPerReq) + got = append(got, res...) + } + + _, _, err = meta.ListWithCursor(db, uint32(countPerReq), cursor) + require.ErrorIs(t, err, meta.ErrEndOfListing, "count:%d", countPerReq, cursor) + + got = sortAddresses(got) + require.Equal(t, expected, got, "count:%d", countPerReq) + } + }) + + t.Run("invalid cursor", func(t *testing.T) { + _, cursor, err := meta.ListWithCursor(db, total/2, "") + require.NoError(t, err) + + _, _, err = meta.ListWithCursor(db, total/2, "x"+cursor[1:]) + require.Error(t, err) + + _, _, err = meta.ListWithCursor(db, total/2, cursor[1:]) + require.Error(t, err) + }) + + t.Run("invalid count", func(t *testing.T) { + _, _, err := meta.ListWithCursor(db, 0, "") + require.ErrorIs(t, err, meta.ErrEndOfListing) + }) +} + +func TestAddObjectDuringListingWithCursor(t *testing.T) { + db := newDB(t) + + const total = 5 + + expected := make(map[string]int, total) + + // fill metabase with objects + for i := 0; i < total; i++ { + obj := generateRawObject(t) + err := putBig(db, obj.Object()) + require.NoError(t, err) + expected[obj.Object().Address().String()] = 0 + } + + // get half of the objects + got, cursor, err := meta.ListWithCursor(db, total/2, "") + require.NoError(t, err) + for _, obj := range got { + if _, ok := expected[obj.String()]; ok { + expected[obj.String()]++ + } + } + + // add new objects + for i := 0; i < total; i++ { + obj := generateRawObject(t) + err = putBig(db, obj.Object()) + require.NoError(t, err) + } + + // get remaining objects + for { + got, cursor, err = meta.ListWithCursor(db, total, cursor) + if errors.Is(err, meta.ErrEndOfListing) { + break + } + for _, obj := range got { + if _, ok := expected[obj.String()]; ok { + expected[obj.String()]++ + } + } + } + + // check if all expected objects were fetched after database update + for _, v := range expected { + require.Equal(t, 1, v) + } + +} + +func sortAddresses(addr []*objectSDK.Address) []*objectSDK.Address { + sort.Slice(addr, func(i, j int) bool { + return addr[i].String() < addr[j].String() + }) + return addr +}