forked from TrueCloudLab/frostfs-node
metabase: Properly check expiration epoch
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
9d1c915c42
commit
bc0d9c5a03
2 changed files with 37 additions and 9 deletions
|
@ -123,7 +123,10 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
|
||||||
if objectLocked(tx, addr.Container(), addr.Object()) {
|
if objectLocked(tx, addr.Container(), addr.Object()) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
return objectStatusUnlocked(tx, addr, currEpoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectStatusUnlocked(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
|
||||||
// we check only if the object is expired in the current
|
// we check only if the object is expired in the current
|
||||||
// epoch since it is considered the only corner case: the
|
// epoch since it is considered the only corner case: the
|
||||||
// GC is expected to collect all the objects that have
|
// GC is expected to collect all the objects that have
|
||||||
|
|
|
@ -3,8 +3,10 @@ package meta
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -87,18 +89,41 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
func isExpiredAt(obj *objectSDK.Object, epoch uint64) bool {
|
||||||
if checkStatus {
|
attrs := obj.ToV2().GetHeader().GetAttributes() // Convert ToV2() to avoid allocations.
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
for i := range attrs {
|
||||||
case 1:
|
if attrs[i].GetKey() == objectV2.SysAttributeExpEpoch {
|
||||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
val, err := strconv.ParseUint(attrs[i].GetValue(), 10, 64)
|
||||||
case 2:
|
return err == nil && val < epoch
|
||||||
return nil, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
|
||||||
case 3:
|
|
||||||
return nil, ErrObjectIsExpired
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||||
|
var locked bool
|
||||||
|
if checkStatus {
|
||||||
|
locked = objectLocked(tx, addr.Container(), addr.Object())
|
||||||
|
if !locked {
|
||||||
|
switch objectStatusUnlocked(tx, addr, currEpoch) {
|
||||||
|
case 1:
|
||||||
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
case 2:
|
||||||
|
return nil, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
obj, err := db.getNoExpired(tx, addr, key, raw)
|
||||||
|
if err != nil || !checkStatus {
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
if !locked && isExpiredAt(obj, currEpoch) {
|
||||||
|
return nil, ErrObjectIsExpired
|
||||||
|
}
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) getNoExpired(tx *bbolt.Tx, addr oid.Address, key []byte, raw bool) (*objectSDK.Object, error) {
|
||||||
key = objectKey(addr.Object(), key)
|
key = objectKey(addr.Object(), key)
|
||||||
cnr := addr.Container()
|
cnr := addr.Container()
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
|
|
Loading…
Reference in a new issue