forked from TrueCloudLab/frostfs-node
[#1583] metabase: Skip expired objects in ListWithCursor
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
e44782473a
commit
0da998ef50
1 changed files with 16 additions and 6 deletions
|
@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
// cursor. Includes objects of all types. Does not include inhumed and expired
|
||||||
|
// objects.
|
||||||
// Use cursor value from response for consecutive requests.
|
// Use cursor value from response for consecutive requests.
|
||||||
//
|
//
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
|
@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
||||||
|
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
|
||||||
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for ; name != nil; name, _ = c.Next() {
|
for ; name != nil; name, _ = c.Next() {
|
||||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
||||||
|
@ -167,7 +170,7 @@ loop:
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||||
result, count, cursor, threshold)
|
result, count, cursor, threshold, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -212,6 +215,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
|
currEpoch uint64,
|
||||||
) ([]objectcore.Info, []byte, *Cursor, error) {
|
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
|
@ -243,13 +247,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||||
|
if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
var ecInfo *objectcore.ECInfo
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if objType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
|
||||||
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = isLinkObject(&o)
|
||||||
ecHeader := o.ECHeader()
|
ecHeader := o.ECHeader()
|
||||||
if ecHeader != nil {
|
if ecHeader != nil {
|
||||||
|
|
Loading…
Reference in a new issue