diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 9a5e6850..436f8112 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -1,15 +1,18 @@ package meta import ( + "strings" + core "github.com/nspcc-dev/neofs-node/pkg/core/object" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" "go.etcd.io/bbolt" ) // Cursor is a type for continuous object listing. type Cursor struct { - bucket uint8 - address *object.Address + bucketName []byte + inBucketOffset []byte } // ListPrm contains parameters for ListWithCursor operation. @@ -48,12 +51,6 @@ func (l ListRes) Cursor() *Cursor { return l.cursor } -const ( - cursorBucketPrimary = iota - cursorBucketTombstone - cursorBucketSG -) - // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes regular, tombstone and storage group objects. Does not // include inhumed objects. Use cursor value from response for consecutive requests. @@ -88,55 +85,50 @@ func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) { func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor *Cursor) ([]*object.Address, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor result := make([]*object.Address, 0, count) - unique := make(map[string]struct{}) // do not parse the same containerID twice + var bucketName []byte c := tx.Cursor() name, _ := c.First() if !threshold { - name, _ = c.Seek([]byte(cursor.address.ContainerID().String())) + name, _ = c.Seek(cursor.bucketName) } loop: for ; name != nil; name, _ = c.Next() { - containerID := parseContainerID(name, unique) + containerID, postfix := parseContainerIDWithPostfix(name) if containerID == nil { continue } - unique[containerID.String()] = struct{}{} + switch postfix { + case "", storageGroupPostfix, tombstonePostfix: + default: + continue + } + prefix := containerID.String() + "/" - lookupBuckets := [...]struct { - name []byte - bucket uint8 - }{ - {primaryBucketName(containerID), cursorBucketPrimary}, - {tombstoneBucketName(containerID), cursorBucketTombstone}, - {storageGroupBucketName(containerID), cursorBucketSG}, + result, cursor = selectNFromBucket(tx, name, prefix, result, count, cursor, threshold) + bucketName = name + if len(result) >= count { + break loop } - for _, lb := range lookupBuckets { - if !threshold && cursor.bucket != lb.bucket { - continue // start from the bucket, specified in the cursor bucket - } - - result, cursor = selectNFromBucket(tx, lb.name, prefix, result, count, cursor, threshold) - cursor.bucket = lb.bucket - if len(result) >= count { - break loop - } - - // set threshold flag after first `selectNFromBucket` invocation - // first invocation must look for cursor object - threshold = true - } + // set threshold flag after first `selectNFromBucket` invocation + // first invocation must look for cursor object + threshold = true } if len(result) == 0 { return nil, nil, core.ErrEndOfListing } + // new slice is much faster but less memory efficient + // we need to copy, because bucketName exists during bbolt tx + cursor.bucketName = make([]byte, len(bucketName)) + copy(cursor.bucketName, bucketName) + return result, cursor, nil } @@ -163,8 +155,10 @@ func selectNFromBucket(tx *bbolt.Tx, c := bkt.Cursor() k, _ := c.First() + offset := cursor.inBucketOffset + if !threshold { - c.Seek([]byte(cursor.address.ObjectID().String())) + c.Seek(offset) k, _ = c.Next() // we are looking for objects _after_ the cursor } @@ -176,7 +170,7 @@ func selectNFromBucket(tx *bbolt.Tx, if err := a.Parse(prefix + string(k)); err != nil { break } - cursor.address = a + offset = k if inGraveyard(tx, a) > 0 { continue } @@ -184,5 +178,30 @@ func selectNFromBucket(tx *bbolt.Tx, count++ } + // new slice is much faster but less memory efficient + // we need to copy, because offset exists during bbolt tx + cursor.inBucketOffset = make([]byte, len(offset)) + copy(cursor.inBucketOffset, offset) + return to, cursor } + +func parseContainerIDWithPostfix(name []byte) (*cid.ID, string) { + var ( + containerID = cid.New() + containerIDStr = string(name) + postfix string + ) + + ind := strings.Index(string(name), invalidBase58String) + if ind > 0 { + postfix = containerIDStr[ind:] + containerIDStr = containerIDStr[:ind] + } + + if err := containerID.Parse(containerIDStr); err != nil { + return nil, "" + } + + return containerID, postfix +}