From 52f949d92a03c244a0408b04e4685697c44ae77a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 11:19:03 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in list.go Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/list.go | 258 +++++++++----------- pkg/local_object_storage/metabase/pebble.go | 35 +++ 2 files changed, 144 insertions(+), 149 deletions(-) diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index ac5342a1b..6f50a9479 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -2,16 +2,17 @@ package meta import ( "context" + "fmt" + "slices" "time" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -21,10 +22,33 @@ import ( // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = logicerr.New("end of object listing") +type listPrefix struct { + prefix []byte + keyParser func(k []byte) (oid.Address, error) + objectType objectSDK.Type +} + +var listPrefixes = []listPrefix{ + { + prefix: []byte{primaryPrefix}, + keyParser: addressFromPrimaryKey, + objectType: objectSDK.TypeRegular, + }, + { + prefix: []byte{lockersPrefix}, + keyParser: addressFromLockersKey, + objectType: objectSDK.TypeLock, + }, + { + prefix: []byte{tombstonePrefix}, + keyParser: addressFromTombstoneKey, + objectType: objectSDK.TypeTombstone, + }, +} + // Cursor is a type for continuous object listing. type Cursor struct { - bucketName []byte - inBucketOffset []byte + lastKey []byte } // ListPrm contains parameters for ListWithCursor operation. @@ -89,173 +113,109 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err return res, ErrDegradedMode } - result := make([]objectcore.Info, 0, prm.count) + if prm.count == 0 { + return ListRes{}, ErrEndOfListing + } - err = db.database.View(func(tx *bbolt.Tx) error { - res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) + err = db.snapshot(func(s *pebble.Snapshot) error { + res.addrList, res.cursor, err = db.listWithCursor(ctx, s, prm.count, prm.cursor) return err }) success = err == nil return res, metaerr.Wrap(err) } -func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) { - threshold := cursor == nil // threshold is a flag to ignore cursor - var bucketName []byte - var err error - - c := tx.Cursor() - name, _ := c.First() - - if !threshold { - name, _ = c.Seek(cursor.bucketName) +func (db *DB) listWithCursor(ctx context.Context, r pebble.Reader, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) { + var prefix []byte + var lastSeen []byte + if cursor != nil { + prefix = []byte{cursor.lastKey[0]} + lastSeen = cursor.lastKey + } else { + prefix = listPrefixes[0].prefix } - var containerID cid.ID - var offset []byte - graveyardBkt := tx.Bucket(graveyardBucketName) - garbageBkt := tx.Bucket(garbageBucketName) - - rawAddr := make([]byte, cidSize, addressKeySize) - -loop: - for ; name != nil; name, _ = c.Next() { - cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) - if cidRaw == nil { - continue - } - - var objType objectSDK.Type - - switch prefix { - case primaryPrefix: - objType = objectSDK.TypeRegular - case lockersPrefix: - objType = objectSDK.TypeLock - case tombstonePrefix: - objType = objectSDK.TypeTombstone - default: - continue - } - - bkt := tx.Bucket(name) - if bkt != nil { - copy(rawAddr, cidRaw) - result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, - result, count, cursor, threshold) - if err != nil { - return nil, nil, err - } - } - bucketName = name - if len(result) >= count { - break loop - } - - // set threshold flag after first `selectNFromBucket` invocation - // first invocation must look for cursor object - threshold = true + idx := slices.IndexFunc(listPrefixes, func(e listPrefix) bool { + return e.prefix[0] == prefix[0] + }) + if idx < 0 { + return nil, nil, fmt.Errorf("invalid prefix value %d", prefix[0]) } - if offset != nil { - // new slice is much faster but less memory efficient - // we need to copy, because offset exists during bbolt tx - cursor.inBucketOffset = make([]byte, len(offset)) - copy(cursor.inBucketOffset, offset) + var next Cursor + result := make([]objectcore.Info, 0, count) + for ; idx < len(listPrefixes); idx++ { + indexResult, lastIndexSeen, err := listByPrefix(ctx, r, lastSeen, idx, count-len(result)) + if err != nil { + return nil, nil, err + } + result = append(result, indexResult...) + if len(lastIndexSeen) > 0 { + next.lastKey = lastIndexSeen + } + if len(result) == count { + return result, &next, nil + } + lastSeen = nil } - if len(result) == 0 { return nil, nil, ErrEndOfListing } - - // new slice is much faster but less memory efficient - // we need to copy, because bucketName exists during bbolt tx - cursor.bucketName = make([]byte, len(bucketName)) - copy(cursor.bucketName, bucketName) - - return result, cursor, nil + return result, &next, nil } -// selectNFromBucket similar to selectAllFromBucket but uses cursor to find -// object to start selecting from. Ignores inhumed objects. -func selectNFromBucket(bkt *bbolt.Bucket, // main bucket - objType objectSDK.Type, // type of the objects stored in the main bucket - graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets - cidRaw []byte, // container ID prefix, optimization - cnt cid.ID, // container ID - to []objectcore.Info, // listing result - limit int, // stop listing at `limit` items in result - cursor *Cursor, // start from cursor object - threshold bool, // ignore cursor and start immediately -) ([]objectcore.Info, []byte, *Cursor, error) { - if cursor == nil { - cursor = new(Cursor) - } - - count := len(to) - c := bkt.Cursor() - k, v := c.First() - - offset := cursor.inBucketOffset - - if !threshold { - c.Seek(offset) - k, v = c.Next() // we are looking for objects _after_ the cursor - } - - for ; k != nil; k, v = c.Next() { - if count >= limit { - break +func listByPrefix(ctx context.Context, r pebble.Reader, lastSeen []byte, idx int, count int) ([]objectcore.Info, []byte, error) { + var result []objectcore.Info + for { + kvs, err := selectByPrefixAndSeek(ctx, r, listPrefixes[idx].prefix, lastSeen, count-len(result)) + if err != nil { + return nil, nil, err } - - var obj oid.ID - if err := obj.Decode(k); err != nil { - break + if len(kvs) == 0 { + return result, lastSeen, nil } - - offset = k - if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { - continue - } - - var isLinkingObj bool - var ecInfo *objectcore.ECInfo - if objType == objectSDK.TypeRegular { - var o objectSDK.Object - if err := o.Unmarshal(v); err != nil { - return nil, nil, nil, err + for _, kv := range kvs { + lastSeen = kv.Key + addr, err := listPrefixes[idx].keyParser(kv.Key) + if err != nil { + return nil, nil, err } - isLinkingObj = isLinkObject(&o) - ecHeader := o.ECHeader() - if ecHeader != nil { - ecInfo = &objectcore.ECInfo{ - ParentID: ecHeader.Parent(), - Index: ecHeader.Index(), - Total: ecHeader.Total(), + st, err := inGraveyardWithKey(r, addr) + if err != nil { + return nil, nil, err + } + if st > 0 { + continue + } + + var isLinkingObj bool + var ecInfo *objectcore.ECInfo + if listPrefixes[idx].objectType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(kv.Value); err != nil { + return nil, nil, err + } + isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } } } + + result = append(result, objectcore.Info{ + Address: addr, + Type: listPrefixes[idx].objectType, + IsLinkingObject: isLinkingObj, + ECInfo: ecInfo, + }) + + if len(result) == count { + return result, lastSeen, nil + } } - - var a oid.Address - a.SetContainer(cnt) - a.SetObject(obj) - to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}) - count++ } - - return to, offset, cursor, nil -} - -func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { - if len(name) < bucketKeySize { - return nil, 0 - } - - rawID := name[1:bucketKeySize] - - if err := containerID.Decode(rawID); err != nil { - return nil, 0 - } - - return rawID, name[0] } diff --git a/pkg/local_object_storage/metabase/pebble.go b/pkg/local_object_storage/metabase/pebble.go index b5ad3f10c..f696fc2e4 100644 --- a/pkg/local_object_storage/metabase/pebble.go +++ b/pkg/local_object_storage/metabase/pebble.go @@ -89,3 +89,38 @@ func deleteByPrefix(ctx context.Context, b *pebble.Batch, prefix []byte) error { } } } + +func selectByPrefixAndSeek(ctx context.Context, r pebble.Reader, prefix, lastSeen []byte, batchSize int) ([]keyValue, error) { + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + }) + if err != nil { + return nil, err + } + + var result []keyValue + var v bool + if len(lastSeen) == 0 { + v = it.First() + } else { + v = it.SeekGE(lastSeen) + } + for ; v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + select { + case <-ctx.Done(): + return nil, errors.Join(ctx.Err(), it.Close()) + default: + } + if bytes.Equal(lastSeen, it.Key()) { + continue + } + var current keyValue + current.Key = bytes.Clone(it.Key()) + current.Value = bytes.Clone(it.Value()) + result = append(result, current) + if len(result) == batchSize { + return result, it.Close() + } + } + return result, it.Close() +}