forked from TrueCloudLab/frostfs-node
[#1548] metabase: Check if EC parent is removed or expired
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
386a12eea4
commit
1f6cf57e30
3 changed files with 63 additions and 1 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestStorageEngine_ECInhume(t *testing.T) {
|
||||
parentObjectAddress := oidtest.Address()
|
||||
containerID := parentObjectAddress.Container()
|
||||
|
||||
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
|
||||
chunkObject0.SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: parentObjectAddress.Object(),
|
||||
}, 0, 4, []byte{}, 0))
|
||||
|
||||
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
|
||||
chunkObject1.SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: parentObjectAddress.Object(),
|
||||
}, 1, 4, []byte{}, 0))
|
||||
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
|
||||
payload, err := tombstone.Marshal()
|
||||
require.NoError(t, err)
|
||||
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
|
||||
tombstoneObject.SetType(objectSDK.TypeTombstone)
|
||||
tombstoneObject.SetPayload(payload)
|
||||
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
|
||||
|
||||
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
|
||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
|
||||
|
||||
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||
|
||||
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
|
||||
|
||||
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
|
||||
}
|
||||
|
||||
func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -93,6 +93,17 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
var locked bool
|
||||
if !ecParent.Equals(oid.Address{}) {
|
||||
st, err := objectStatus(tx, ecParent, currEpoch)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
switch st {
|
||||
case 2:
|
||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||
case 3:
|
||||
return false, locked, ErrObjectIsExpired
|
||||
}
|
||||
|
||||
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
||||
}
|
||||
// check graveyard and object expiration first
|
||||
|
|
|
@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
return PutRes{}, errors.New("missing container in object")
|
||||
}
|
||||
|
||||
var ecParentAddress oid.Address
|
||||
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ecHeader.Parent())
|
||||
}
|
||||
|
||||
isParent := si != nil
|
||||
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
if errors.As(err, &splitInfoError) {
|
||||
|
|
Loading…
Reference in a new issue