diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 9d7196d9..2d083a58 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -11,6 +11,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) { }) } +func TestStorageEngine_ECInhume(t *testing.T) { + parentObjectAddress := oidtest.Address() + containerID := parentObjectAddress.Container() + + chunkObject0 := testutil.GenerateObjectWithCID(containerID) + chunkObject0.SetECHeader(objectSDK.NewECHeader( + objectSDK.ECParentInfo{ + ID: parentObjectAddress.Object(), + }, 0, 4, []byte{}, 0)) + + chunkObject1 := testutil.GenerateObjectWithCID(containerID) + chunkObject1.SetECHeader(objectSDK.NewECHeader( + objectSDK.ECParentInfo{ + ID: parentObjectAddress.Object(), + }, 1, 4, []byte{}, 0)) + + tombstone := objectSDK.NewTombstone() + tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()}) + payload, err := tombstone.Marshal() + require.NoError(t, err) + tombstoneObject := testutil.GenerateObjectWithCID(containerID) + tombstoneObject.SetType(objectSDK.TypeTombstone) + tombstoneObject.SetPayload(payload) + tombstoneObjectAddress := object.AddressOf(tombstoneObject) + + e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine + defer func() { require.NoError(t, e.Close(context.Background())) }() + + require.NoError(t, Put(context.Background(), e, chunkObject0, false)) + + require.NoError(t, Put(context.Background(), e, tombstoneObject, false)) + + var inhumePrm InhumePrm + inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress) + _, err = e.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + var alreadyRemoved *apistatus.ObjectAlreadyRemoved + + require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved) + + require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved) +} + func TestInhumeExpiredRegularObject(t *testing.T) { t.Parallel() diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index f6596e83..411beb6b 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -93,6 +93,17 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) { var locked bool if !ecParent.Equals(oid.Address{}) { + st, err := objectStatus(tx, ecParent, currEpoch) + if err != nil { + return false, false, err + } + switch st { + case 2: + return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) + case 3: + return false, locked, ErrObjectIsExpired + } + locked = objectLocked(tx, ecParent.Container(), ecParent.Object()) } // check graveyard and object expiration first diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index d7675869..6f9dc1bf 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx, return PutRes{}, errors.New("missing container in object") } + var ecParentAddress oid.Address + if ecHeader := obj.ECHeader(); ecHeader != nil { + ecParentAddress.SetContainer(cnr) + ecParentAddress.SetObject(ecHeader.Parent()) + } + isParent := si != nil - exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch) + exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch) var splitInfoError *objectSDK.SplitInfoError if errors.As(err, &splitInfoError) {