From 284b270c1f1c5e17c6bd9a63f540d1dc14a633e9 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 15 Aug 2024 09:46:14 +0300 Subject: [PATCH] metabase: Omit expired objects from listing Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/metabase/list.go | 19 ++++++++----- .../metabase/list_test.go | 27 +++++++++++++++++-- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index f7a314452..efe9b4a23 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -92,14 +92,14 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err result := make([]objectcore.AddressWithType, 0, prm.count) err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) + res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor, db.epochState.CurrentEpoch()) return err }) success = err == nil return res, metaerr.Wrap(err) } -func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { +func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor, currEpoch uint64) ([]objectcore.AddressWithType, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor var bucketName []byte var err error @@ -142,7 +142,7 @@ loop: if bkt != nil { copy(rawAddr, cidRaw) result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, - result, count, cursor, threshold) + result, count, cursor, threshold, currEpoch) if err != nil { return nil, nil, err } @@ -187,6 +187,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately + currEpoch uint64, ) ([]objectcore.AddressWithType, []byte, *Cursor, error) { if cursor == nil { cursor = new(Cursor) @@ -218,12 +219,16 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket continue } + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return nil, nil, nil, err + } + if !objectLocked(bkt.Tx(), cnt, obj) && isExpiredAt(&o, currEpoch) { + continue + } + var isLinkingObj bool if objType == objectSDK.TypeRegular { - var o objectSDK.Object - if err := o.Unmarshal(v); err != nil { - return nil, nil, nil, err - } isLinkingObj = isLinkObject(&o) } diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go index 25c0e35bd..53a992232 100644 --- a/pkg/local_object_storage/metabase/list_test.go +++ b/pkg/local_object_storage/metabase/list_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "sort" + "strconv" "testing" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" @@ -70,12 +72,14 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) { func TestLisObjectsWithCursor(t *testing.T) { t.Parallel() - db := newDB(t) + const currentEpoch = uint64(10) + + db := newDB(t, meta.WithEpochState(epochState{e: currentEpoch})) defer func() { require.NoError(t, db.Close()) }() const ( containers = 5 - total = containers * 4 // regular + ts + child + lock + total = containers * 5 // regular + ts + child + lock + regular unexpired ) expected := make([]object.AddressWithType, 0, total) @@ -126,6 +130,25 @@ func TestLisObjectsWithCursor(t *testing.T) { err = putBig(db, child) require.NoError(t, err) expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) + + // add objects, expirind in different epochs: + // - 2 epochs ago + // - last epoch + // - this epoch (available) + for expiration := currentEpoch - 2; expiration <= currentEpoch; expiration++ { + obj := testutil.GenerateObjectWithCID(containerID) + + var attr objectSDK.Attribute + attr.SetKey(objectV2.SysAttributeExpEpoch) + attr.SetValue(strconv.FormatUint(expiration, 10)) + obj.SetAttributes(attr) + + require.NoError(t, putBig(db, obj)) + if expiration == currentEpoch { + expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) + } + } + } expected = sortAddresses(expected)