Fix checking EC parent existence on Put object to shard #1548
7 changed files with 82 additions and 18 deletions
|
@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
for range b.N {
|
for range b.N {
|
||||||
var shPrm shard.ExistsPrm
|
var shPrm shard.ExistsPrm
|
||||||
shPrm.Address = addr
|
shPrm.Address = addr
|
||||||
shPrm.ParentAddress = oid.Address{}
|
shPrm.ECParentAddress = oid.Address{}
|
||||||
ok, _, err := e.exists(context.Background(), shPrm)
|
ok, _, err := e.exists(context.Background(), shPrm)
|
||||||
if err != nil || ok {
|
if err != nil || ok {
|
||||||
b.Fatalf("%t %v", ok, err)
|
b.Fatalf("%t %v", ok, err)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStorageEngine_ECInhume(t *testing.T) {
|
||||||
|
parentObjectAddress := oidtest.Address()
|
||||||
|
containerID := parentObjectAddress.Container()
|
||||||
|
|
||||||
|
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
|
||||||
|
chunkObject0.SetECHeader(objectSDK.NewECHeader(
|
||||||
|
objectSDK.ECParentInfo{
|
||||||
|
ID: parentObjectAddress.Object(),
|
||||||
|
}, 0, 4, []byte{}, 0))
|
||||||
|
|
||||||
|
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
|
||||||
|
chunkObject1.SetECHeader(objectSDK.NewECHeader(
|
||||||
|
objectSDK.ECParentInfo{
|
||||||
|
ID: parentObjectAddress.Object(),
|
||||||
|
}, 1, 4, []byte{}, 0))
|
||||||
|
|
||||||
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
|
||||||
|
payload, err := tombstone.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
|
||||||
|
tombstoneObject.SetType(objectSDK.TypeTombstone)
|
||||||
|
tombstoneObject.SetPayload(payload)
|
||||||
|
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
|
||||||
|
|
||||||
|
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
|
||||||
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
|
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
|
||||||
|
|
||||||
|
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||||
|
|
||||||
|
var inhumePrm InhumePrm
|
||||||
|
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||||
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
|
||||||
|
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
|
||||||
|
|
||||||
|
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
|
||||||
|
}
|
||||||
|
|
||||||
func TestInhumeExpiredRegularObject(t *testing.T) {
|
func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
|
|
||||||
// In #1146 this check was parallelized, however, it became
|
// In #1146 this check was parallelized, however, it became
|
||||||
// much slower on fast machines for 4 shards.
|
// much slower on fast machines for 4 shards.
|
||||||
var parent oid.Address
|
var ecParent oid.Address
|
||||||
if prm.Object.ECHeader() != nil {
|
if prm.Object.ECHeader() != nil {
|
||||||
parent.SetObject(prm.Object.ECHeader().Parent())
|
ecParent.SetObject(prm.Object.ECHeader().Parent())
|
||||||
parent.SetContainer(addr.Container())
|
ecParent.SetContainer(addr.Container())
|
||||||
}
|
}
|
||||||
var shPrm shard.ExistsPrm
|
var shPrm shard.ExistsPrm
|
||||||
shPrm.Address = addr
|
shPrm.Address = addr
|
||||||
shPrm.ParentAddress = parent
|
shPrm.ECParentAddress = ecParent
|
||||||
existed, locked, err := e.exists(ctx, shPrm)
|
existed, locked, err := e.exists(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !existed && locked {
|
if !existed && locked {
|
||||||
lockers, err := e.GetLocked(ctx, parent)
|
lockers, err := e.GetLocked(ctx, ecParent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@ import (
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
type ExistsPrm struct {
|
type ExistsPrm struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
paddr oid.Address
|
ecParentAddr oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
|
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetParent is an Exists option to set objects parent.
|
// SetECParent is an Exists option to set objects parent.
|
||||||
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
func (p *ExistsPrm) SetECParent(addr oid.Address) {
|
||||||
p.paddr = addr
|
p.ecParentAddr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exists returns the fact that the object is in the metabase.
|
// Exists returns the fact that the object is in the metabase.
|
||||||
|
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||||
var locked bool
|
var locked bool
|
||||||
if !parent.Equals(oid.Address{}) {
|
if !ecParent.Equals(oid.Address{}) {
|
||||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
st, err := objectStatus(tx, ecParent, currEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
switch st {
|
||||||
|
case 2:
|
||||||
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||||
|
case 3:
|
||||||
|
return false, locked, ErrObjectIsExpired
|
||||||
|
}
|
||||||
|
|
||||||
|
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
||||||
}
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
st, err := objectStatus(tx, addr, currEpoch)
|
st, err := objectStatus(tx, addr, currEpoch)
|
||||||
|
|
|
@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
return PutRes{}, errors.New("missing container in object")
|
return PutRes{}, errors.New("missing container in object")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ecParentAddress oid.Address
|
||||||
|
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||||
|
ecParentAddress.SetContainer(cnr)
|
||||||
|
ecParentAddress.SetObject(ecHeader.Parent())
|
||||||
|
}
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
|
||||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
|
|
|
@ -18,7 +18,7 @@ type ExistsPrm struct {
|
||||||
// Exists option to set object checked for existence.
|
// Exists option to set object checked for existence.
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
// Exists option to set parent object checked for existence.
|
// Exists option to set parent object checked for existence.
|
||||||
ParentAddress oid.Address
|
ECParentAddress oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
|
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
} else {
|
} else {
|
||||||
var existsPrm meta.ExistsPrm
|
var existsPrm meta.ExistsPrm
|
||||||
existsPrm.SetAddress(prm.Address)
|
existsPrm.SetAddress(prm.Address)
|
||||||
existsPrm.SetParent(prm.ParentAddress)
|
existsPrm.SetECParent(prm.ECParentAddress)
|
||||||
|
|
||||||
var res meta.ExistsRes
|
var res meta.ExistsRes
|
||||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||||
|
|
|
@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
|
||||||
}
|
}
|
||||||
chunkIDs[ch.Index] = ecInfoChunkID
|
chunkIDs[ch.Index] = ecInfoChunkID
|
||||||
}
|
}
|
||||||
|
} else if client.IsErrObjectAlreadyRemoved(err) {
|
||||||
|
|||||||
|
restore = false
|
||||||
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||||
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
|
Loading…
Add table
Reference in a new issue
I think I've suggested similar changes for
REP
policy handling (#1543), but #1543 (comment) was said to me.@fyrchik Could you please comment?
ok, fixed
What exactly was fixed? Is seems you still introduce a new behaviour:

Sorry, the behaviour is the same, why have this commit at all?
Technically this commit relates to policer, but others relate to engine.
If the actual question is why this change, then this change if required to not to perform PUT request as we know, that object already removed and PUT request will fail anyway.