Fix checking EC parent existence on Put object to shard support/v0.44 #1551

Merged
dstepanov-yadro merged 3 commits from dstepanov-yadro/frostfs-node:fix/patch_removed_ec_v044 into support/v0.44 2024-12-11 10:17:28 +00:00
7 changed files with 84 additions and 18 deletions

View file

@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
for range b.N {
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
shPrm.ECParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok {
b.Fatalf("%t %v", ok, err)

View file

@ -7,8 +7,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
@ -84,3 +87,47 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.Empty(t, addrs)
})
}
func TestStorageEngine_ECInhume(t *testing.T) {
parentObjectAddress := oidtest.Address()
containerID := parentObjectAddress.Container()
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
chunkObject0.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 0, 4, []byte{}, 0))
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
chunkObject1.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 1, 4, []byte{}, 0))
tombstone := objectSDK.NewTombstone()
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
payload, err := tombstone.Marshal()
require.NoError(t, err)
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
tombstoneObject.SetType(objectSDK.TypeTombstone)
tombstoneObject.SetPayload(payload)
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
}

View file

@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
var parent oid.Address
var ecParent oid.Address
if prm.Object.ECHeader() != nil {
parent.SetObject(prm.Object.ECHeader().Parent())
parent.SetContainer(addr.Container())
ecParent.SetObject(prm.Object.ECHeader().Parent())
ecParent.SetContainer(addr.Container())
}
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = parent
shPrm.ECParentAddress = ecParent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil {
return err
}
if !existed && locked {
lockers, err := e.GetLocked(ctx, parent)
lockers, err := e.GetLocked(ctx, ecParent)
if err != nil {
return err
}

View file

@ -20,8 +20,8 @@ import (
// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
paddr oid.Address
addr oid.Address
ecParentAddr oid.Address
}
// ExistsRes groups the resulting values of Exists operation.
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetParent(addr oid.Address) {
p.paddr = addr
// SetECParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetECParent(addr oid.Address) {
p.ecParentAddr = addr
}
// Exists returns the fact that the object is in the metabase.
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
return err
})
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err)
}
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !parent.Equals(oid.Address{}) {
locked = objectLocked(tx, parent.Container(), parent.Object())
if !ecParent.Equals(oid.Address{}) {
st, err := objectStatus(tx, ecParent, currEpoch)
if err != nil {
return false, false, err
}
switch st {
case 2:
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3:
return false, locked, ErrObjectIsExpired
}
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
}
// check graveyard and object expiration first
st, err := objectStatus(tx, addr, currEpoch)

View file

@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
return PutRes{}, errors.New("missing container in object")
}
var ecParentAddress oid.Address
if ecHeader := obj.ECHeader(); ecHeader != nil {
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ecHeader.Parent())
}
isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) {

View file

@ -18,7 +18,7 @@ type ExistsPrm struct {
// Exists option to set object checked for existence.
Address oid.Address
// Exists option to set parent object checked for existence.
ParentAddress oid.Address
ECParentAddress oid.Address
}
// ExistsRes groups the resulting values of Exists operation.
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
} else {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.Address)
existsPrm.SetParent(prm.ParentAddress)
existsPrm.SetECParent(prm.ECParentAddress)
var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm)

View file

@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
}
chunkIDs[ch.Index] = ecInfoChunkID
}
} else if client.IsErrObjectAlreadyRemoved(err) {
restore = false
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
p.replicator.HandleReplicationTask(ctx, replicator.Task{