[#1548] metabase: Check if EC parent is removed or expired

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-12-10 12:53:54 +03:00
parent 6ea8f2b23c
commit 15b4288d80
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 65 additions and 1 deletions

View file

@ -7,8 +7,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -84,3 +87,47 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.Empty(t, addrs) require.Empty(t, addrs)
}) })
} }
func TestStorageEngine_ECInhume(t *testing.T) {
parentObjectAddress := oidtest.Address()
containerID := parentObjectAddress.Container()
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
chunkObject0.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 0, 4, []byte{}, 0))
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
chunkObject1.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 1, 4, []byte{}, 0))
tombstone := objectSDK.NewTombstone()
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
payload, err := tombstone.Marshal()
require.NoError(t, err)
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
tombstoneObject.SetType(objectSDK.TypeTombstone)
tombstoneObject.SetPayload(payload)
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
}

View file

@ -93,6 +93,17 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) { func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool var locked bool
if !ecParent.Equals(oid.Address{}) { if !ecParent.Equals(oid.Address{}) {
st, err := objectStatus(tx, ecParent, currEpoch)
if err != nil {
return false, false, err
}
switch st {
case 2:
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3:
return false, locked, ErrObjectIsExpired
}
locked = objectLocked(tx, ecParent.Container(), ecParent.Object()) locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
} }
// check graveyard and object expiration first // check graveyard and object expiration first

View file

@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
return PutRes{}, errors.New("missing container in object") return PutRes{}, errors.New("missing container in object")
} }
var ecParentAddress oid.Address
if ecHeader := obj.ECHeader(); ecHeader != nil {
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ecHeader.Parent())
}
isParent := si != nil isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch) exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
var splitInfoError *objectSDK.SplitInfoError var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) { if errors.As(err, &splitInfoError) {