Fix checking EC parent existence on Put object to shard support/v0.44 #1551
7 changed files with 84 additions and 18 deletions
@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
for range b.N {
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
shPrm.ECParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok {
b.Fatalf("%t %v", ok, err)
@ -7,8 +7,11 @@ import (
apistatus ""
cidtest ""
objectSDK ""
oid ""
oidtest ""
@ -84,3 +87,47 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.Empty(t, addrs)
func TestStorageEngine_ECInhume(t *testing.T) {
parentObjectAddress := oidtest.Address()
containerID := parentObjectAddress.Container()
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
ID: parentObjectAddress.Object(),
}, 0, 4, []byte{}, 0))
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
ID: parentObjectAddress.Object(),
}, 1, 4, []byte{}, 0))
tombstone := objectSDK.NewTombstone()
payload, err := tombstone.Marshal()
require.NoError(t, err)
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
var parent oid.Address
var ecParent oid.Address
if prm.Object.ECHeader() != nil {
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = parent
shPrm.ECParentAddress = ecParent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil {
return err
if !existed && locked {
lockers, err := e.GetLocked(ctx, parent)
lockers, err := e.GetLocked(ctx, ecParent)
if err != nil {
return err
@ -20,8 +20,8 @@ import (
// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
paddr oid.Address
addr oid.Address
ecParentAddr oid.Address
// ExistsRes groups the resulting values of Exists operation.
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
// SetParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetParent(addr oid.Address) {
p.paddr = addr
// SetECParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetECParent(addr oid.Address) {
p.ecParentAddr = addr
// Exists returns the fact that the object is in the metabase.
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
return err
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err)
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !parent.Equals(oid.Address{}) {
locked = objectLocked(tx, parent.Container(), parent.Object())
if !ecParent.Equals(oid.Address{}) {
st, err := objectStatus(tx, ecParent, currEpoch)
if err != nil {
return false, false, err
switch st {
case 2:
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3:
return false, locked, ErrObjectIsExpired
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
// check graveyard and object expiration first
st, err := objectStatus(tx, addr, currEpoch)
@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
return PutRes{}, errors.New("missing container in object")
var ecParentAddress oid.Address
if ecHeader := obj.ECHeader(); ecHeader != nil {
isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) {
@ -18,7 +18,7 @@ type ExistsPrm struct {
// Exists option to set object checked for existence.
Address oid.Address
// Exists option to set parent object checked for existence.
ParentAddress oid.Address
ECParentAddress oid.Address
// ExistsRes groups the resulting values of Exists operation.
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
} else {
var existsPrm meta.ExistsPrm
var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm)
@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
chunkIDs[ch.Index] = ecInfoChunkID
} else if client.IsErrObjectAlreadyRemoved(err) {
restore = false
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
p.replicator.HandleReplicationTask(ctx, replicator.Task{
Add table
Reference in a new issue