metabase: Skip expired objects in ListWithCursor
#1583
2 changed files with 45 additions and 10 deletions
|
@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct {
|
|||
}
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase starting from
|
||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
||||
// cursor. Includes objects of all types. Does not include inhumed and expired
|
||||
// objects.
|
||||
// Use cursor value from response for consecutive requests.
|
||||
//
|
||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
|
@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
|||
|
||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
loop:
|
||||
for ; name != nil; name, _ = c.Next() {
|
||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
||||
|
@ -167,7 +170,7 @@ loop:
|
|||
if bkt != nil {
|
||||
copy(rawAddr, cidRaw)
|
||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, count, cursor, threshold)
|
||||
result, count, cursor, threshold, currEpoch)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -212,6 +215,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
limit int, // stop listing at `limit` items in result
|
||||
cursor *Cursor, // start from cursor object
|
||||
threshold bool, // ignore cursor and start immediately
|
||||
currEpoch uint64,
|
||||
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||
if cursor == nil {
|
||||
cursor = new(Cursor)
|
||||
|
@ -243,13 +247,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
continue
|
||||
}
|
||||
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||
if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch {
|
||||
continue
|
||||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
var ecInfo *objectcore.ECInfo
|
||||
if objType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
isLinkingObj = isLinkObject(&o)
|
||||
ecHeader := o.ECHeader()
|
||||
if ecHeader != nil {
|
||||
|
|
|
@ -3,14 +3,17 @@ package meta_test
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -71,14 +74,16 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
|
|||
func TestLisObjectsWithCursor(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
const (
|
||||
currEpoch = 100
|
||||
expEpoch = currEpoch - 1
|
||||
containers = 5
|
||||
total = containers * 4 // regular + ts + child + lock
|
||||
total = containers * 6 // regular + ts + child + lock + non-expired regular + locked expired
|
||||
)
|
||||
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
expected := make([]object.Info, 0, total)
|
||||
|
||||
// fill metabase with objects
|
||||
|
@ -127,6 +132,26 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
|||
err = putBig(db, child)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||
|
||||
// add expired object (do not include into expected)
|
||||
obj = testutil.GenerateObjectWithCID(containerID)
|
||||
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||
require.NoError(t, metaPut(db, obj, nil))
|
||||
|
||||
// add non-expired object (include into expected)
|
||||
obj = testutil.GenerateObjectWithCID(containerID)
|
||||
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(currEpoch))
|
||||
require.NoError(t, metaPut(db, obj, nil))
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||
|
||||
// add locked expired object (include into expected)
|
||||
obj = testutil.GenerateObjectWithCID(containerID)
|
||||
objID := oidtest.ID()
|
||||
obj.SetID(objID)
|
||||
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||
require.NoError(t, metaPut(db, obj, nil))
|
||||
require.NoError(t, db.Lock(context.Background(), containerID, oidtest.ID(), []oid.ID{objID}))
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||
}
|
||||
|
||||
t.Run("success with various count", func(t *testing.T) {
|
||||
|
|
Loading…
Add table
Reference in a new issue