[#1548] metabase: Check if EC parent is removed or expired

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-12-10 12:53:54 +03:00
parent 212a81095e
commit 9139d13f95
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 63 additions and 1 deletions

View file

@ -11,6 +11,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
}) })
} }
func TestStorageEngine_ECInhume(t *testing.T) {
parentObjectAddress := oidtest.Address()
containerID := parentObjectAddress.Container()
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
chunkObject0.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 0, 4, []byte{}, 0))
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
chunkObject1.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 1, 4, []byte{}, 0))
tombstone := objectSDK.NewTombstone()
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
payload, err := tombstone.Marshal()
require.NoError(t, err)
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
tombstoneObject.SetType(objectSDK.TypeTombstone)
tombstoneObject.SetPayload(payload)
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
}
func TestInhumeExpiredRegularObject(t *testing.T) { func TestInhumeExpiredRegularObject(t *testing.T) {
t.Parallel() t.Parallel()

View file

@ -93,6 +93,17 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) { func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool var locked bool
if !ecParent.Equals(oid.Address{}) { if !ecParent.Equals(oid.Address{}) {
st, err := objectStatus(tx, ecParent, currEpoch)
if err != nil {
return false, false, err
}
switch st {
case 2:
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3:
return false, locked, ErrObjectIsExpired
}
locked = objectLocked(tx, ecParent.Container(), ecParent.Object()) locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
} }
// check graveyard and object expiration first // check graveyard and object expiration first

View file

@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
return PutRes{}, errors.New("missing container in object") return PutRes{}, errors.New("missing container in object")
} }
var ecParentAddress oid.Address
if ecHeader := obj.ECHeader(); ecHeader != nil {
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ecHeader.Parent())
}
isParent := si != nil isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch) exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
var splitInfoError *objectSDK.SplitInfoError var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) { if errors.As(err, &splitInfoError) {