From 0da998ef50472d8de4589bfd9105ce1892636c94 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Wed, 25 Dec 2024 22:12:41 +0300 Subject: [PATCH] [#1583] metabase: Skip expired objects in `ListWithCursor` Signed-off-by: Aleksey Savchuk --- pkg/local_object_storage/metabase/list.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index a7ff2222f..375d1cb1a 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct { } // ListWithCursor lists physical objects available in metabase starting from -// cursor. Includes objects of all types. Does not include inhumed objects. +// cursor. Includes objects of all types. Does not include inhumed and expired +// objects. // Use cursor value from response for consecutive requests. // // Returns ErrEndOfListing if there are no more objects to return or count @@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, rawAddr := make([]byte, cidSize, addressKeySize) + currEpoch := db.epochState.CurrentEpoch() + loop: for ; name != nil; name, _ = c.Next() { cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) @@ -167,7 +170,7 @@ loop: if bkt != nil { copy(rawAddr, cidRaw) result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, - result, count, cursor, threshold) + result, count, cursor, threshold, currEpoch) if err != nil { return nil, nil, err } @@ -212,6 +215,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately + currEpoch uint64, ) ([]objectcore.Info, []byte, *Cursor, error) { if cursor == nil { cursor = new(Cursor) @@ -243,13 +247,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket continue } + var o objectSDK.Object + if err := o.Unmarshal(bytes.Clone(v)); err != nil { + return nil, nil, nil, err + } + + expEpoch, hasExpEpoch := hasExpirationEpoch(&o) + if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch { + continue + } + var isLinkingObj bool var ecInfo *objectcore.ECInfo if objType == objectSDK.TypeRegular { - var o objectSDK.Object - if err := o.Unmarshal(bytes.Clone(v)); err != nil { - return nil, nil, nil, err - } isLinkingObj = isLinkObject(&o) ecHeader := o.ECHeader() if ecHeader != nil {