[#1323] metabase: Add expiration epoch buckets

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-20 11:59:42 +03:00 committed by Evgenii Stratonikov
parent 2542d4f5df
commit 7bf20c9f1f
11 changed files with 181 additions and 170 deletions

View file

@ -29,6 +29,7 @@ var (
string(garbageBucketName): {}, string(garbageBucketName): {},
string(shardInfoBucket): {}, string(shardInfoBucket): {},
string(bucketNameLocked): {}, string(bucketNameLocked): {},
string(expEpochToObjectBucketName): {},
} }
// deprecatedBuckets buckets that are not used anymore. // deprecatedBuckets buckets that are not used anymore.

View file

@ -478,6 +478,17 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
key: objKey, key: objKey,
}) })
if expEpoch, ok := hasExpirationEpoch(obj); ok {
delUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
})
delUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
key: objKey,
})
}
return nil return nil
} }

View file

@ -418,7 +418,8 @@ func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
if bytes.Equal(name, shardInfoBucket) || if bytes.Equal(name, shardInfoBucket) ||
bytes.Equal(name, containerCounterBucketName) || bytes.Equal(name, containerCounterBucketName) ||
bytes.Equal(name, containerVolumeBucketName) { bytes.Equal(name, containerVolumeBucketName) ||
bytes.Equal(name, expEpochToObjectBucketName) {
return nil return nil
} }
return testBucketEmpty(name, b) return testBucketEmpty(name, b)

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"time" "time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -96,7 +95,11 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
locked = objectLocked(tx, parent.Container(), parent.Object()) locked = objectLocked(tx, parent.Container(), parent.Object())
} }
// check graveyard and object expiration first // check graveyard and object expiration first
switch objectStatus(tx, addr, currEpoch) { st, err := objectStatus(tx, addr, currEpoch)
if err != nil {
return false, false, err
}
switch st {
case 1: case 1:
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound)) return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
case 2: case 2:
@ -138,30 +141,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
// - 1 if object with GC mark; // - 1 if object with GC mark;
// - 2 if object is covered with tombstone; // - 2 if object is covered with tombstone;
// - 3 if object is expired. // - 3 if object is expired.
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
// locked object could not be removed/marked with GC/expired // locked object could not be removed/marked with GC/expired
if objectLocked(tx, addr.Container(), addr.Object()) { if objectLocked(tx, addr.Container(), addr.Object()) {
return 0 return 0, nil
} }
// we check only if the object is expired in the current expired, err := isExpired(tx, addr, currEpoch)
// epoch since it is considered the only corner case: the if err != nil {
// GC is expected to collect all the objects that have return 0, err
// expired previously for less than the one epoch duration
expired := isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpoch, addr, currEpoch)
if !expired {
expired = isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpochNeoFS, addr, currEpoch)
} }
if expired { if expired {
return 3 return 3, nil
} }
graveyardBkt := tx.Bucket(graveyardBucketName) graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName) garbageBkt := tx.Bucket(garbageBucketName)
addrKey := addressKey(addr, make([]byte, addressKeySize)) addrKey := addressKey(addr, make([]byte, addressKeySize))
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt) return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt), nil
} }
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 { func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {

View file

@ -2,12 +2,11 @@ package meta
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"fmt"
"strconv" "strconv"
"time" "time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -17,6 +16,8 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
var errInvalidEpochValueLength = errors.New("could not parse expiration epoch: invalid data length")
// FilterExpired return expired items from addresses. // FilterExpired return expired items from addresses.
// Address considered expired if metabase does contain information about expiration and // Address considered expired if metabase does contain information about expiration and
// expiration epoch is less than epoch. // expiration epoch is less than epoch.
@ -57,29 +58,11 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
default: default:
} }
expiredNeoFS, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpochNeoFS, epoch, containerID, objectIDs) expired, err := selectExpiredObjects(tx, epoch, containerID, objectIDs)
if err != nil { if err != nil {
return err return err
} }
result = append(result, expired...)
expiredSys, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpoch, epoch, containerID, objectIDs)
if err != nil {
return err
}
for _, o := range expiredNeoFS {
var a oid.Address
a.SetContainer(containerID)
a.SetObject(o)
result = append(result, a)
}
for _, o := range expiredSys {
var a oid.Address
a.SetContainer(containerID)
a.SetObject(o)
result = append(result, a)
}
} }
return nil return nil
}) })
@ -90,76 +73,39 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
return result, nil return result, nil
} }
func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool { func isExpired(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, error) {
// bucket with objects that have expiration attr bucketName := make([]byte, bucketKeySize)
attrKey := make([]byte, bucketKeySize+len(attr)) bucketName = objectToExpirationEpochBucketName(addr.Container(), bucketName)
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey)) b := tx.Bucket(bucketName)
if expirationBucket != nil { if b == nil {
// bucket that contains objects that expire in the current epoch return false, nil
prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10)))
if prevEpochBkt != nil {
rawOID := objectKey(addr.Object(), make([]byte, objectKeySize))
if prevEpochBkt.Get(rawOID) != nil {
return true
} }
key := make([]byte, objectKeySize)
addr.Object().Encode(key)
val := b.Get(key)
if len(val) == 0 {
return false, nil
} }
if len(val) != epochSize {
return false, errInvalidEpochValueLength
} }
expEpoch := binary.LittleEndian.Uint64(val)
return false return expEpoch < currEpoch, nil
} }
func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.ID, error) { func selectExpiredObjects(tx *bbolt.Tx, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.Address, error) {
result := make([]oid.ID, 0) result := make([]oid.Address, 0)
notResolved := make(map[oid.ID]struct{}) var addr oid.Address
for _, oid := range objectIDs { addr.SetContainer(containerID)
notResolved[oid] = struct{}{} for _, objID := range objectIDs {
} addr.SetObject(objID)
expired, err := isExpired(tx, addr, epoch)
expiredBuffer := make([]oid.ID, 0)
objectKeyBuffer := make([]byte, objectKeySize)
expirationBucketKey := make([]byte, bucketKeySize+len(attr))
expirationBucket := tx.Bucket(attributeBucketName(containerID, attr, expirationBucketKey))
if expirationBucket == nil {
return result, nil // all not expired
}
err := expirationBucket.ForEach(func(epochExpBucketKey, _ []byte) error {
bucketExpiresAfter, err := strconv.ParseUint(string(epochExpBucketKey), 10, 64)
if err != nil { if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err)
} else if bucketExpiresAfter >= epoch {
return nil
}
epochExpirationBucket := expirationBucket.Bucket(epochExpBucketKey)
if epochExpirationBucket == nil {
return nil
}
expiredBuffer = expiredBuffer[:0]
for oid := range notResolved {
key := objectKey(oid, objectKeyBuffer)
if epochExpirationBucket.Get(key) != nil {
expiredBuffer = append(expiredBuffer, oid)
}
}
for _, oid := range expiredBuffer {
delete(notResolved, oid)
result = append(result, oid)
}
if len(notResolved) == 0 {
return errBreakBucketForEach
}
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return nil, err return nil, err
} }
if expired {
result = append(result, addr)
}
}
return result, nil return result, nil
} }

View file

@ -90,7 +90,11 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
if checkStatus { if checkStatus {
switch objectStatus(tx, addr, currEpoch) { st, err := objectStatus(tx, addr, currEpoch)
if err != nil {
return nil, err
}
switch st {
case 1: case 1:
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
case 2: case 2:

View file

@ -7,7 +7,6 @@ import (
"strconv" "strconv"
"time" "time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -79,63 +78,37 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
} }
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { b := tx.Bucket(expEpochToObjectBucketName)
cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch) c := b.Cursor()
if cidBytes == nil { for k, _ := c.First(); k != nil; k, _ = c.Next() {
cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS) expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
if cidBytes == nil { if err != nil {
return err
}
// bucket keys ordered by epoch, no need to continue lookup
if expiresAfter >= epoch {
return nil return nil
} }
if objectLocked(tx, cnr, obj) {
continue
} }
var cnrID cid.ID
err := cnrID.Decode(cidBytes)
if err != nil {
return fmt.Errorf("could not parse container ID of expired bucket: %w", err)
}
return b.ForEachBucket(func(expKey []byte) error {
bktExpired := b.Bucket(expKey)
expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64)
if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err)
} else if expiresAfter >= epoch {
return nil
}
return bktExpired.ForEach(func(idKey, _ []byte) error {
var id oid.ID
err = id.Decode(idKey)
if err != nil {
return fmt.Errorf("could not parse ID of expired object: %w", err)
}
// Ignore locked objects.
//
// To slightly optimize performance we can check only REGULAR objects
// (only they can be locked), but it's more reliable.
if objectLocked(tx, cnrID, id) {
return nil
}
var addr oid.Address var addr oid.Address
addr.SetContainer(cnrID) addr.SetContainer(cnr)
addr.SetObject(id) addr.SetObject(obj)
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
return h(&ExpiredObject{ err = h(&ExpiredObject{
typ: firstIrregularObjectType(tx, cnrID, idKey), typ: firstIrregularObjectType(tx, cnr, objKey),
addr: addr, addr: addr,
}) })
}) if err == nil {
}) continue
}) }
if errors.Is(err, ErrInterruptIterator) {
if errors.Is(err, ErrInterruptIterator) { return nil
err = nil
} }
return err return err
}
return nil
} }
// IterateCoveredByTombstones iterates over all objects in DB which are covered // IterateCoveredByTombstones iterates over all objects in DB which are covered

View file

@ -6,8 +6,10 @@ import (
"errors" "errors"
"fmt" "fmt"
gio "io" gio "io"
"strconv"
"time" "time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
@ -242,6 +244,27 @@ func putUniqueIndexes(
} }
} }
if expEpoch, ok := hasExpirationEpoch(obj); ok {
err := putUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
val: zeroValue,
})
if err != nil {
return err
}
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, expEpoch)
err = putUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
key: objKey,
val: val,
})
if err != nil {
return err
}
}
// index root object // index root object
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
if ecHead := obj.ECHeader(); ecHead != nil { if ecHead := obj.ECHeader(); ecHead != nil {
@ -361,6 +384,24 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
return nil return nil
} }
func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
attributes := obj.Attributes()
if ech := obj.ECHeader(); ech != nil {
attributes = ech.ParentAttributes()
}
for _, attr := range attributes {
if attr.Key() == objectV2.SysAttributeExpEpochNeoFS {
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
return expEpoch, err == nil
}
if attr.Key() == objectV2.SysAttributeExpEpoch {
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
return expEpoch, err == nil
}
}
return 0, false
}
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
id, _ := obj.ID() id, _ := obj.ID()
cnr, _ := obj.ContainerID() cnr, _ := obj.ContainerID()

View file

@ -142,8 +142,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters
var addr oid.Address var addr oid.Address
addr.SetContainer(cnr) addr.SetContainer(cnr)
addr.SetObject(id) addr.SetObject(id)
st, err := objectStatus(tx, addr, currEpoch)
if objectStatus(tx, addr, currEpoch) > 0 { if err != nil {
return nil, err
}
if st > 0 {
continue // ignore removed objects continue // ignore removed objects
} }

View file

@ -1,9 +1,10 @@
package meta package meta
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/binary"
"errors" "errors"
"fmt"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -23,6 +24,7 @@ var (
toMoveItBucketName = []byte{toMoveItPrefix} toMoveItBucketName = []byte{toMoveItPrefix}
containerVolumeBucketName = []byte{containerVolumePrefix} containerVolumeBucketName = []byte{containerVolumePrefix}
containerCounterBucketName = []byte{containerCountersPrefix} containerCounterBucketName = []byte{containerCountersPrefix}
expEpochToObjectBucketName = []byte{expirationEpochToObjectPrefix}
zeroValue = []byte{0xFF} zeroValue = []byte{0xFF}
@ -124,6 +126,16 @@ const (
// Key: container ID + type // Key: container ID + type
// Value: Object id // Value: Object id
ecInfoPrefix ecInfoPrefix
// expirationEpochToObjectPrefix is used for storing relation between expiration epoch and object id.
// Key: expiration epoch + object address
// Value: zero
expirationEpochToObjectPrefix
// objectToExpirationEpochPrefix is used for storing relation between expiration epoch and object id.
// Key: object address
// Value: expiration epoch
objectToExpirationEpochPrefix
) )
const ( const (
@ -131,6 +143,7 @@ const (
bucketKeySize = 1 + cidSize bucketKeySize = 1 + cidSize
objectKeySize = sha256.Size objectKeySize = sha256.Size
addressKeySize = cidSize + objectKeySize addressKeySize = cidSize + objectKeySize
epochSize = 8
) )
func bucketName(cnr cid.ID, prefix byte, key []byte) []byte { func bucketName(cnr cid.ID, prefix byte, key []byte) []byte {
@ -161,15 +174,6 @@ func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
return append(key[:bucketKeySize], attributeKey...) return append(key[:bucketKeySize], attributeKey...)
} }
// returns <CID> from attributeBucketName result, nil otherwise.
func cidFromAttributeBucket(val []byte, attributeKey string) []byte {
if len(val) < bucketKeySize || val[0] != userAttributePrefix || !bytes.Equal(val[bucketKeySize:], []byte(attributeKey)) {
return nil
}
return val[1:bucketKeySize]
}
// rootBucketName returns <CID>_root. // rootBucketName returns <CID>_root.
func rootBucketName(cnr cid.ID, key []byte) []byte { func rootBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, rootPrefix, key) return bucketName(cnr, rootPrefix, key)
@ -190,6 +194,35 @@ func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ecInfoPrefix, key) return bucketName(cnr, ecInfoPrefix, key)
} }
// objectToExpirationEpochBucketName returns objectToExpirationEpochPrefix_<CID>.
func objectToExpirationEpochBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, objectToExpirationEpochPrefix, key)
}
func expirationEpochKey(epoch uint64, cnr cid.ID, obj oid.ID) []byte {
result := make([]byte, epochSize+addressKeySize)
binary.BigEndian.PutUint64(result, epoch)
cnr.Encode(result[epochSize:])
obj.Encode(result[epochSize+cidSize:])
return result
}
func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) {
if len(key) != epochSize+addressKeySize {
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("unexpected expiration epoch to object key length: %d", len(key))
}
epoch := binary.BigEndian.Uint64(key)
var cnr cid.ID
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err)
}
var obj oid.ID
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err)
}
return epoch, cnr, obj, nil
}
// addressKey returns key for K-V tables when key is a whole address. // addressKey returns key for K-V tables when key is a whole address.
func addressKey(addr oid.Address, key []byte) []byte { func addressKey(addr oid.Address, key []byte) []byte {
addr.Container().Encode(key) addr.Container().Encode(key)

View file

@ -74,7 +74,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
var getPrm GetPrm var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj)) getPrm.SetAddress(objectCore.AddressOf(obj))
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted") require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired object must be deleted")
} }
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
@ -168,7 +168,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires") require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")
} }
func TestGCDropsObjectInhumedFromWritecache(t *testing.T) { func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {