metabase: Omit expired objects from listing

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-08-15 09:46:14 +03:00
parent bc0d9c5a03
commit 284b270c1f
2 changed files with 37 additions and 9 deletions

View file

@ -92,14 +92,14 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
result := make([]objectcore.AddressWithType, 0, prm.count)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor, db.epochState.CurrentEpoch())
return err
})
success = err == nil
return res, metaerr.Wrap(err)
}
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor, currEpoch uint64) ([]objectcore.AddressWithType, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte
var err error
@ -142,7 +142,7 @@ loop:
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold)
result, count, cursor, threshold, currEpoch)
if err != nil {
return nil, nil, err
}
@ -187,6 +187,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately
currEpoch uint64,
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
if cursor == nil {
cursor = new(Cursor)
@ -218,12 +219,16 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
continue
}
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err
}
if !objectLocked(bkt.Tx(), cnt, obj) && isExpiredAt(&o, currEpoch) {
continue
}
var isLinkingObj bool
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o)
}

View file

@ -4,8 +4,10 @@ import (
"context"
"errors"
"sort"
"strconv"
"testing"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -70,12 +72,14 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
func TestLisObjectsWithCursor(t *testing.T) {
t.Parallel()
db := newDB(t)
const currentEpoch = uint64(10)
db := newDB(t, meta.WithEpochState(epochState{e: currentEpoch}))
defer func() { require.NoError(t, db.Close()) }()
const (
containers = 5
total = containers * 4 // regular + ts + child + lock
total = containers * 5 // regular + ts + child + lock + regular unexpired
)
expected := make([]object.AddressWithType, 0, total)
@ -126,6 +130,25 @@ func TestLisObjectsWithCursor(t *testing.T) {
err = putBig(db, child)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
// add objects, expirind in different epochs:
// - 2 epochs ago
// - last epoch
// - this epoch (available)
for expiration := currentEpoch - 2; expiration <= currentEpoch; expiration++ {
obj := testutil.GenerateObjectWithCID(containerID)
var attr objectSDK.Attribute
attr.SetKey(objectV2.SysAttributeExpEpoch)
attr.SetValue(strconv.FormatUint(expiration, 10))
obj.SetAttributes(attr)
require.NoError(t, putBig(db, obj))
if expiration == currentEpoch {
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
}
}
}
expected = sortAddresses(expected)