From aa9ce8a8530834b9ba79c67e31da0bf6a4d69d3b Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 11 Nov 2021 17:27:11 +0300 Subject: [PATCH] [#948] engine: Define cursor for object listing as a type Signed-off-by: Alex Vanin --- pkg/local_object_storage/engine/list.go | 68 +++++--------- pkg/local_object_storage/metabase/list.go | 91 ++++++++----------- .../metabase/list_test.go | 17 +--- pkg/local_object_storage/shard/list.go | 17 ++-- 4 files changed, 71 insertions(+), 122 deletions(-) diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 8d887c248..28b199a95 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -1,19 +1,23 @@ package engine import ( - "fmt" "sort" - "strconv" "github.com/nspcc-dev/neofs-api-go/pkg/object" core "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" ) +// Cursor is a type for continuous object listing. +type Cursor struct { + shardID string + shardCursor *shard.Cursor +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 - cursor string + cursor *Cursor } // WithCount sets maximum amount of addresses that ListWithCursor can return. @@ -23,9 +27,9 @@ func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm { } // WithCursor sets cursor for ListWithCursor operation. For initial request -// ignore this param or use empty string. For continues requests, use value +// ignore this param or use nil value. For continues requests, use value // from ListWithCursorRes. -func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm { +func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) *ListWithCursorPrm { p.cursor = cursor return p } @@ -33,7 +37,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm { // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { addrList []*object.Address - cursor string + cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. @@ -42,7 +46,7 @@ func (l ListWithCursorRes) AddressList() []*object.Address { } // Cursor returns cursor for consecutive listing requests. -func (l ListWithCursorRes) Cursor() string { +func (l ListWithCursorRes) Cursor() *Cursor { return l.cursor } @@ -51,10 +55,7 @@ func (l ListWithCursorRes) Cursor() string { // Does not include inhumed objects. Use cursor value from response // for consecutive requests. func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) { - var ( - err error - result = make([]*object.Address, 0, prm.count) - ) + result := make([]*object.Address, 0, prm.count) // 1. Get available shards and sort them. e.mtx.RLock() @@ -72,14 +73,10 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR return shardIDs[i] < shardIDs[j] }) - // 2. Decode shard ID from cursor. + // 2. Prepare cursor object. cursor := prm.cursor - cursorShardID := shardIDs[0] - if len(cursor) > 0 { - cursorShardID, cursor, err = decodeID(cursor) - if err != nil { - return nil, err - } + if cursor == nil { + cursor = &Cursor{shardID: shardIDs[0]} } // 3. Iterate over available shards. Skip unavailable shards. @@ -88,7 +85,7 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR break } - if shardIDs[i] < cursorShardID { + if shardIDs[i] < cursor.shardID { continue } @@ -101,8 +98,8 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR count := uint32(int(prm.count) - len(result)) shardPrm := new(shard.ListWithCursorPrm).WithCount(count) - if shardIDs[i] == cursorShardID { - shardPrm.WithCursor(cursor) + if shardIDs[i] == cursor.shardID { + shardPrm.WithCursor(cursor.shardCursor) } res, err := shardInstance.ListWithCursor(shardPrm) @@ -111,8 +108,8 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR } result = append(result, res.AddressList()...) - cursor = res.Cursor() - cursorShardID = shardIDs[i] + cursor.shardCursor = res.Cursor() + cursor.shardID = shardIDs[i] } if len(result) == 0 { @@ -121,29 +118,6 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR return &ListWithCursorRes{ addrList: result, - cursor: encodeID(cursorShardID, cursor), + cursor: cursor, }, nil } - -func decodeID(cursor string) (shardID string, shardCursor string, err error) { - ln := len(cursor) - if ln < 2 { - return "", "", fmt.Errorf("invalid cursor %s", cursor) - } - - idLen, err := strconv.Atoi(cursor[:2]) - if err != nil { - return "", "", fmt.Errorf("invalid cursor %s", cursor) - } - - if len(cursor) < 2+idLen { - return "", "", fmt.Errorf("invalid cursor %s", cursor) - } - - return cursor[2 : 2+idLen], cursor[2+idLen:], nil -} - -func encodeID(id, cursor string) string { - prefix := fmt.Sprintf("%02d", len(id)) - return prefix + id + cursor -} diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index c76bd9558..2732b36b8 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -1,18 +1,21 @@ package meta import ( - "fmt" - "strings" - "github.com/nspcc-dev/neofs-api-go/pkg/object" core "github.com/nspcc-dev/neofs-node/pkg/core/object" "go.etcd.io/bbolt" ) +// Cursor is a type for continuous object listing. +type Cursor struct { + bucket uint8 + address *object.Address +} + // ListPrm contains parameters for ListWithCursor operation. type ListPrm struct { count int - cursor string + cursor *Cursor } // WithCount sets maximum amount of addresses that ListWithCursor can return. @@ -22,9 +25,9 @@ func (l *ListPrm) WithCount(count uint32) *ListPrm { } // WithCursor sets cursor for ListWithCursor operation. For initial request -// ignore this param or use empty string. For continues requests, use value +// ignore this param or use nil value. For continues requests, use value // from ListRes. -func (l *ListPrm) WithCursor(cursor string) *ListPrm { +func (l *ListPrm) WithCursor(cursor *Cursor) *ListPrm { l.cursor = cursor return l } @@ -32,7 +35,7 @@ func (l *ListPrm) WithCursor(cursor string) *ListPrm { // ListRes contains values returned from ListWithCursor operation. type ListRes struct { addrList []*object.Address - cursor string + cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. @@ -41,23 +44,23 @@ func (l ListRes) AddressList() []*object.Address { } // Cursor returns cursor for consecutive listing requests. -func (l ListRes) Cursor() string { +func (l ListRes) Cursor() *Cursor { return l.cursor } const ( - cursorPrefixPrimary = 'p' - cursorPrefixTombstone = 't' - cursorPrefixSG = 's' + cursorBucketPrimary = iota + cursorBucketTombstone + cursorBucketSG ) // ListWithCursor lists physical objects available in metabase. Includes regular, // tombstone and storage group objects. Does not include inhumed objects. Use // cursor value from response for consecutive requests. -func ListWithCursor(db *DB, count uint32, cursor string) ([]*object.Address, string, error) { +func ListWithCursor(db *DB, count uint32, cursor *Cursor) ([]*object.Address, *Cursor, error) { r, err := db.ListWithCursor(new(ListPrm).WithCount(count).WithCursor(cursor)) if err != nil { - return nil, "", err + return nil, nil, err } return r.AddressList(), r.Cursor(), nil @@ -76,25 +79,8 @@ func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) { return res, err } -func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object.Address, string, error) { - threshold := len(cursor) == 0 // threshold is a flag to ignore cursor - a := object.NewAddress() - var cursorPrefix uint8 - - if !threshold { // if cursor is present, then decode it and check sanity - cursorPrefix = cursor[0] - switch cursorPrefix { - case cursorPrefixPrimary, cursorPrefixSG, cursorPrefixTombstone: - default: - return nil, "", fmt.Errorf("invalid cursor prefix %s", string(cursorPrefix)) - } - - cursor = cursor[1:] - if err := a.Parse(cursor); err != nil { - return nil, "", fmt.Errorf("invalid cursor address: %w", err) - } - } - +func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor *Cursor) ([]*object.Address, *Cursor, error) { + threshold := cursor == nil // threshold is a flag to ignore cursor result := make([]*object.Address, 0, count) unique := make(map[string]struct{}) // do not parse the same containerID twice @@ -102,7 +88,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor string) ([]*object. name, _ := c.First() if !threshold { - name, _ = c.Seek([]byte(a.ContainerID().String())) + name, _ = c.Seek([]byte(cursor.address.ContainerID().String())) } loop: @@ -117,20 +103,20 @@ loop: lookupBuckets := [...]struct { name []byte - prefix uint8 + bucket uint8 }{ - {primaryBucketName(containerID), cursorPrefixPrimary}, - {tombstoneBucketName(containerID), cursorPrefixTombstone}, - {storageGroupBucketName(containerID), cursorPrefixSG}, + {primaryBucketName(containerID), cursorBucketPrimary}, + {tombstoneBucketName(containerID), cursorBucketTombstone}, + {storageGroupBucketName(containerID), cursorBucketSG}, } for _, lb := range lookupBuckets { - if !threshold && cursorPrefix != lb.prefix { - continue // start from the bucket, specified in the cursor prefix + if !threshold && cursor.bucket != lb.bucket { + continue // start from the bucket, specified in the cursor bucket } - cursorPrefix = lb.prefix result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold) + cursor.bucket = lb.bucket if len(result) >= count { break loop } @@ -142,10 +128,10 @@ loop: } if len(result) == 0 { - return nil, "", core.ErrEndOfListing + return nil, nil, core.ErrEndOfListing } - return result, string(cursorPrefix) + cursor, nil + return result, cursor, nil } // selectNFromBucket similar to selectAllFromBucket but uses cursor to find @@ -155,22 +141,24 @@ func selectNFromBucket(tx *bbolt.Tx, prefix string, // string of CID, optimization to []*object.Address, // listing result limit int, // stop listing at `limit` items in result - cursor string, // start from cursor object + cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately -) ([]*object.Address, string) { +) ([]*object.Address, *Cursor) { bkt := tx.Bucket(name) if bkt == nil { return to, cursor } - count := len(to) + if cursor == nil { + cursor = new(Cursor) + } + count := len(to) c := bkt.Cursor() k, _ := c.First() if !threshold { - seekKey := strings.Replace(cursor, prefix, "", 1) - c.Seek([]byte(seekKey)) + c.Seek([]byte(cursor.address.ObjectID().String())) k, _ = c.Next() // we are looking for objects _after_ the cursor } @@ -178,19 +166,14 @@ func selectNFromBucket(tx *bbolt.Tx, if count >= limit { break } - - key := prefix + string(k) - cursor = key - a := object.NewAddress() - if err := a.Parse(key); err != nil { + if err := a.Parse(prefix + string(k)); err != nil { break } - + cursor.address = a if inGraveyard(tx, a) > 0 { continue } - to = append(to, a) count++ } diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go index 84f5737c2..8f76eb2ca 100644 --- a/pkg/local_object_storage/metabase/list_test.go +++ b/pkg/local_object_storage/metabase/list_test.go @@ -75,7 +75,7 @@ func TestLisObjectsWithCursor(t *testing.T) { for countPerReq := 1; countPerReq <= total; countPerReq++ { got := make([]*objectSDK.Address, 0, total) - res, cursor, err := meta.ListWithCursor(db, uint32(countPerReq), "") + res, cursor, err := meta.ListWithCursor(db, uint32(countPerReq), nil) require.NoError(t, err, "count:%d", countPerReq) got = append(got, res...) @@ -98,19 +98,8 @@ func TestLisObjectsWithCursor(t *testing.T) { } }) - t.Run("invalid cursor", func(t *testing.T) { - _, cursor, err := meta.ListWithCursor(db, total/2, "") - require.NoError(t, err) - - _, _, err = meta.ListWithCursor(db, total/2, "x"+cursor[1:]) - require.Error(t, err) - - _, _, err = meta.ListWithCursor(db, total/2, cursor[1:]) - require.Error(t, err) - }) - t.Run("invalid count", func(t *testing.T) { - _, _, err := meta.ListWithCursor(db, 0, "") + _, _, err := meta.ListWithCursor(db, 0, nil) require.ErrorIs(t, err, core.ErrEndOfListing) }) } @@ -131,7 +120,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) { } // get half of the objects - got, cursor, err := meta.ListWithCursor(db, total/2, "") + got, cursor, err := meta.ListWithCursor(db, total/2, nil) require.NoError(t, err) for _, obj := range got { if _, ok := expected[obj.String()]; ok { diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index bf584024a..53ebe1816 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -9,6 +9,9 @@ import ( "go.uber.org/zap" ) +// Cursor is a type for continuous object listing. +type Cursor = meta.Cursor + type ListContainersPrm struct{} type ListContainersRes struct { @@ -22,13 +25,13 @@ func (r *ListContainersRes) Containers() []*cid.ID { // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 - cursor string + cursor *Cursor } // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { addrList []*object.Address - cursor string + cursor *Cursor } // WithCount sets maximum amount of addresses that ListWithCursor can return. @@ -38,9 +41,9 @@ func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm { } // WithCursor sets cursor for ListWithCursor operation. For initial request, -// ignore this param or use empty string. For continues requests, use value +// ignore this param or use nil value. For continues requests, use value // from ListWithCursorRes. -func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm { +func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) *ListWithCursorPrm { p.cursor = cursor return p } @@ -51,7 +54,7 @@ func (r ListWithCursorRes) AddressList() []*object.Address { } // Cursor returns cursor for consecutive listing requests. -func (r ListWithCursorRes) Cursor() string { +func (r ListWithCursorRes) Cursor() *Cursor { return r.cursor } @@ -121,11 +124,11 @@ func (s *Shard) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, erro // ListWithCursor lists physical objects available in shard starting from // cursor. Includes regular, tombstone and storage group objects. Does not // include inhumed objects. Use cursor value from response for consecutive requests. -func ListWithCursor(s *Shard, count uint32, cursor string) ([]*object.Address, string, error) { +func ListWithCursor(s *Shard, count uint32, cursor *Cursor) ([]*object.Address, *Cursor, error) { prm := new(ListWithCursorPrm).WithCount(count).WithCursor(cursor) res, err := s.ListWithCursor(prm) if err != nil { - return nil, "", err + return nil, nil, err } return res.AddressList(), res.Cursor(), nil