Fix checking EC parent existence on Put object to shard #1548
7 changed files with 82 additions and 18 deletions
|
@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
for range b.N {
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.Address = addr
|
||||
shPrm.ParentAddress = oid.Address{}
|
||||
shPrm.ECParentAddress = oid.Address{}
|
||||
ok, _, err := e.exists(context.Background(), shPrm)
|
||||
if err != nil || ok {
|
||||
b.Fatalf("%t %v", ok, err)
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestStorageEngine_ECInhume(t *testing.T) {
|
||||
parentObjectAddress := oidtest.Address()
|
||||
containerID := parentObjectAddress.Container()
|
||||
|
||||
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
|
||||
chunkObject0.SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: parentObjectAddress.Object(),
|
||||
}, 0, 4, []byte{}, 0))
|
||||
|
||||
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
|
||||
chunkObject1.SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: parentObjectAddress.Object(),
|
||||
}, 1, 4, []byte{}, 0))
|
||||
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
|
||||
payload, err := tombstone.Marshal()
|
||||
require.NoError(t, err)
|
||||
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
|
||||
tombstoneObject.SetType(objectSDK.TypeTombstone)
|
||||
tombstoneObject.SetPayload(payload)
|
||||
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
|
||||
|
||||
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
|
||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
|
||||
|
||||
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||
|
||||
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
|
||||
|
||||
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
|
||||
}
|
||||
|
||||
func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
|
||||
// In #1146 this check was parallelized, however, it became
|
||||
// much slower on fast machines for 4 shards.
|
||||
var parent oid.Address
|
||||
var ecParent oid.Address
|
||||
if prm.Object.ECHeader() != nil {
|
||||
parent.SetObject(prm.Object.ECHeader().Parent())
|
||||
parent.SetContainer(addr.Container())
|
||||
ecParent.SetObject(prm.Object.ECHeader().Parent())
|
||||
ecParent.SetContainer(addr.Container())
|
||||
}
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.Address = addr
|
||||
shPrm.ParentAddress = parent
|
||||
shPrm.ECParentAddress = ecParent
|
||||
existed, locked, err := e.exists(ctx, shPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !existed && locked {
|
||||
lockers, err := e.GetLocked(ctx, parent)
|
||||
lockers, err := e.GetLocked(ctx, ecParent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ import (
|
|||
|
||||
// ExistsPrm groups the parameters of Exists operation.
|
||||
type ExistsPrm struct {
|
||||
addr oid.Address
|
||||
paddr oid.Address
|
||||
addr oid.Address
|
||||
ecParentAddr oid.Address
|
||||
}
|
||||
|
||||
// ExistsRes groups the resulting values of Exists operation.
|
||||
|
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
|||
p.addr = addr
|
||||
}
|
||||
|
||||
// SetParent is an Exists option to set objects parent.
|
||||
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
||||
p.paddr = addr
|
||||
// SetECParent is an Exists option to set objects parent.
|
||||
func (p *ExistsPrm) SetECParent(addr oid.Address) {
|
||||
p.ecParentAddr = addr
|
||||
}
|
||||
|
||||
// Exists returns the fact that the object is in the metabase.
|
||||
|
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
var locked bool
|
||||
if !parent.Equals(oid.Address{}) {
|
||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||
if !ecParent.Equals(oid.Address{}) {
|
||||
st, err := objectStatus(tx, ecParent, currEpoch)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
switch st {
|
||||
case 2:
|
||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||
case 3:
|
||||
return false, locked, ErrObjectIsExpired
|
||||
}
|
||||
|
||||
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
||||
}
|
||||
// check graveyard and object expiration first
|
||||
st, err := objectStatus(tx, addr, currEpoch)
|
||||
|
|
|
@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
return PutRes{}, errors.New("missing container in object")
|
||||
}
|
||||
|
||||
var ecParentAddress oid.Address
|
||||
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ecHeader.Parent())
|
||||
}
|
||||
|
||||
isParent := si != nil
|
||||
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
if errors.As(err, &splitInfoError) {
|
||||
|
|
|
@ -18,7 +18,7 @@ type ExistsPrm struct {
|
|||
// Exists option to set object checked for existence.
|
||||
Address oid.Address
|
||||
// Exists option to set parent object checked for existence.
|
||||
ParentAddress oid.Address
|
||||
ECParentAddress oid.Address
|
||||
}
|
||||
|
||||
// ExistsRes groups the resulting values of Exists operation.
|
||||
|
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
} else {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(prm.Address)
|
||||
existsPrm.SetParent(prm.ParentAddress)
|
||||
existsPrm.SetECParent(prm.ECParentAddress)
|
||||
|
||||
var res meta.ExistsRes
|
||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||
|
|
|
@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
|
|||
}
|
||||
chunkIDs[ch.Index] = ecInfoChunkID
|
||||
}
|
||||
} else if client.IsErrObjectAlreadyRemoved(err) {
|
||||
|
||||
restore = false
|
||||
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||
|
|
Loading…
Add table
Reference in a new issue
I think I've suggested similar changes for
REP
policy handling (#1543), but #1543 (comment) was said to me.@fyrchik Could you please comment?
ok, fixed
What exactly was fixed? Is seems you still introduce a new behaviour:

Sorry, the behaviour is the same, why have this commit at all?
Technically this commit relates to policer, but others relate to engine.
If the actual question is why this change, then this change if required to not to perform PUT request as we know, that object already removed and PUT request will fail anyway.