metabase: Skip expired objects in ListWithCursor
#1583
2 changed files with 45 additions and 10 deletions
|
@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
// cursor. Includes objects of all types. Does not include inhumed and expired
|
||||||
|
// objects.
|
||||||
// Use cursor value from response for consecutive requests.
|
// Use cursor value from response for consecutive requests.
|
||||||
//
|
//
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
|
@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
||||||
|
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
|
||||||
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for ; name != nil; name, _ = c.Next() {
|
for ; name != nil; name, _ = c.Next() {
|
||||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
||||||
|
@ -167,7 +170,7 @@ loop:
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||||
result, count, cursor, threshold)
|
result, count, cursor, threshold, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -212,6 +215,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
|
currEpoch uint64,
|
||||||
) ([]objectcore.Info, []byte, *Cursor, error) {
|
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
|
@ -243,13 +247,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||||
|
if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
var ecInfo *objectcore.ECInfo
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if objType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
|
||||||
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = isLinkObject(&o)
|
||||||
ecHeader := o.ECHeader()
|
ecHeader := o.ECHeader()
|
||||||
if ecHeader != nil {
|
if ecHeader != nil {
|
||||||
|
|
|
@ -3,14 +3,17 @@ package meta_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -71,14 +74,16 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
|
||||||
func TestLisObjectsWithCursor(t *testing.T) {
|
func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := newDB(t)
|
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
currEpoch = 100
|
||||||
|
expEpoch = currEpoch - 1
|
||||||
containers = 5
|
containers = 5
|
||||||
total = containers * 4 // regular + ts + child + lock
|
total = containers * 6 // regular + ts + child + lock + non-expired regular + locked expired
|
||||||
)
|
)
|
||||||
|
|
||||||
|
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||||
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
expected := make([]object.Info, 0, total)
|
expected := make([]object.Info, 0, total)
|
||||||
|
|
||||||
// fill metabase with objects
|
// fill metabase with objects
|
||||||
|
@ -127,6 +132,26 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
err = putBig(db, child)
|
err = putBig(db, child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
|
// add expired object (do not include into expected)
|
||||||
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||||
|
require.NoError(t, metaPut(db, obj, nil))
|
||||||
|
|
||||||
|
// add non-expired object (include into expected)
|
||||||
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(currEpoch))
|
||||||
|
require.NoError(t, metaPut(db, obj, nil))
|
||||||
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
|
// add locked expired object (include into expected)
|
||||||
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
objID := oidtest.ID()
|
||||||
|
obj.SetID(objID)
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||||
|
require.NoError(t, metaPut(db, obj, nil))
|
||||||
|
require.NoError(t, db.Lock(context.Background(), containerID, oidtest.ID(), []oid.ID{objID}))
|
||||||
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("success with various count", func(t *testing.T) {
|
t.Run("success with various count", func(t *testing.T) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue