metabase: Skip expired objects in ListWithCursor #1583

Merged
fyrchik merged 2 commits from a-savchuk/frostfs-node:do-not-list-expired-objects into master 2024-12-27 05:42:07 +00:00
2 changed files with 45 additions and 10 deletions

View file

@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct {
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects.
// cursor. Includes objects of all types. Does not include inhumed and expired
// objects.
// Use cursor value from response for consecutive requests.
//
// Returns ErrEndOfListing if there are no more objects to return or count
@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
rawAddr := make([]byte, cidSize, addressKeySize)
currEpoch := db.epochState.CurrentEpoch()
loop:
for ; name != nil; name, _ = c.Next() {
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
@ -167,7 +170,7 @@ loop:
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold)
result, count, cursor, threshold, currEpoch)
if err != nil {
return nil, nil, err
}
@ -212,6 +215,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately
currEpoch uint64,
) ([]objectcore.Info, []byte, *Cursor, error) {
if cursor == nil {
cursor = new(Cursor)
@ -243,13 +247,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
continue
}
var o objectSDK.Object
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
return nil, nil, nil, err
}
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {

View file

@ -3,14 +3,17 @@ package meta_test
import (
"context"
"errors"
"strconv"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
@ -71,14 +74,16 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
func TestLisObjectsWithCursor(t *testing.T) {
t.Parallel()
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
const (
currEpoch = 100
expEpoch = currEpoch - 1
containers = 5
total = containers * 4 // regular + ts + child + lock
total = containers * 6 // regular + ts + child + lock + non-expired regular + locked expired
)
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close(context.Background())) }()
expected := make([]object.Info, 0, total)
// fill metabase with objects
@ -127,6 +132,26 @@ func TestLisObjectsWithCursor(t *testing.T) {
err = putBig(db, child)
require.NoError(t, err)
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
// add expired object (do not include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
require.NoError(t, metaPut(db, obj, nil))
// add non-expired object (include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(currEpoch))
require.NoError(t, metaPut(db, obj, nil))
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
// add locked expired object (include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
objID := oidtest.ID()
obj.SetID(objID)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
require.NoError(t, metaPut(db, obj, nil))
require.NoError(t, db.Lock(context.Background(), containerID, oidtest.ID(), []oid.ID{objID}))
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
}
t.Run("success with various count", func(t *testing.T) {