diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index aa9aba106..c9ca54367 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -123,7 +123,10 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { if objectLocked(tx, addr.Container(), addr.Object()) { return 0 } + return objectStatusUnlocked(tx, addr, currEpoch) +} +func objectStatusUnlocked(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { // we check only if the object is expired in the current // epoch since it is considered the only corner case: the // GC is expected to collect all the objects that have diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d18331a3d..746a46535 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -3,8 +3,10 @@ package meta import ( "context" "fmt" + "strconv" "time" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -87,18 +89,41 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { return res, metaerr.Wrap(err) } -func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { - if checkStatus { - switch objectStatus(tx, addr, currEpoch) { - case 1: - return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) - case 2: - return nil, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) - case 3: - return nil, ErrObjectIsExpired +func isExpiredAt(obj *objectSDK.Object, epoch uint64) bool { + attrs := obj.ToV2().GetHeader().GetAttributes() // Convert ToV2() to avoid allocations. + for i := range attrs { + if attrs[i].GetKey() == objectV2.SysAttributeExpEpoch { + val, err := strconv.ParseUint(attrs[i].GetValue(), 10, 64) + return err == nil && val < epoch } } + return false +} +func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { + var locked bool + if checkStatus { + locked = objectLocked(tx, addr.Container(), addr.Object()) + if !locked { + switch objectStatusUnlocked(tx, addr, currEpoch) { + case 1: + return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) + case 2: + return nil, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) + } + } + } + obj, err := db.getNoExpired(tx, addr, key, raw) + if err != nil || !checkStatus { + return obj, err + } + if !locked && isExpiredAt(obj, currEpoch) { + return nil, ErrObjectIsExpired + } + return obj, nil +} + +func (db *DB) getNoExpired(tx *bbolt.Tx, addr oid.Address, key []byte, raw bool) (*objectSDK.Object, error) { key = objectKey(addr.Object(), key) cnr := addr.Container() obj := objectSDK.New()