Compare commits

...

3 commits

Author SHA1 Message Date
9139d13f95
[#1548] metabase: Check if EC parent is removed or expired
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 2m46s
Tests and linters / Run gofumpt (pull_request) Successful in 2m50s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m12s
DCO action / DCO (pull_request) Successful in 3m12s
Tests and linters / gopls check (pull_request) Successful in 3m35s
Build / Build Components (pull_request) Successful in 4m4s
Tests and linters / Staticcheck (pull_request) Successful in 4m37s
Tests and linters / Tests (pull_request) Successful in 5m21s
Tests and linters / Lint (pull_request) Successful in 5m26s
Tests and linters / Tests with -race (pull_request) Successful in 6m34s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-12-10 14:37:21 +03:00
212a81095e
[#1548] engine: Rename parent -> ecParent
Parent could mean split parent or EC parent. In this case it is EC parent only.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-12-10 14:37:20 +03:00
5b42ea4c82
[#1548] policer: Do not replicate EC chunk if object already removed
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-12-10 14:37:20 +03:00
7 changed files with 82 additions and 18 deletions

View file

@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
for range b.N {
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
shPrm.ECParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok {
b.Fatalf("%t %v", ok, err)

View file

@ -11,6 +11,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
})
}
func TestStorageEngine_ECInhume(t *testing.T) {
parentObjectAddress := oidtest.Address()
containerID := parentObjectAddress.Container()
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
chunkObject0.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 0, 4, []byte{}, 0))
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
chunkObject1.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 1, 4, []byte{}, 0))
tombstone := objectSDK.NewTombstone()
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
payload, err := tombstone.Marshal()
require.NoError(t, err)
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
tombstoneObject.SetType(objectSDK.TypeTombstone)
tombstoneObject.SetPayload(payload)
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
}
func TestInhumeExpiredRegularObject(t *testing.T) {
t.Parallel()

View file

@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
var parent oid.Address
var ecParent oid.Address
if prm.Object.ECHeader() != nil {
parent.SetObject(prm.Object.ECHeader().Parent())
parent.SetContainer(addr.Container())
ecParent.SetObject(prm.Object.ECHeader().Parent())
ecParent.SetContainer(addr.Container())
}
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = parent
shPrm.ECParentAddress = ecParent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil {
return err
}
if !existed && locked {
lockers, err := e.GetLocked(ctx, parent)
lockers, err := e.GetLocked(ctx, ecParent)
if err != nil {
return err
}

View file

@ -20,8 +20,8 @@ import (
// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
paddr oid.Address
addr oid.Address
ecParentAddr oid.Address
}
// ExistsRes groups the resulting values of Exists operation.
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetParent(addr oid.Address) {
p.paddr = addr
// SetECParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetECParent(addr oid.Address) {
p.ecParentAddr = addr
}
// Exists returns the fact that the object is in the metabase.
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
return err
})
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err)
}
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !parent.Equals(oid.Address{}) {
locked = objectLocked(tx, parent.Container(), parent.Object())
if !ecParent.Equals(oid.Address{}) {
st, err := objectStatus(tx, ecParent, currEpoch)
if err != nil {
return false, false, err
}
switch st {
case 2:
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3:
return false, locked, ErrObjectIsExpired
}
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
}
// check graveyard and object expiration first
st, err := objectStatus(tx, addr, currEpoch)

View file

@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
return PutRes{}, errors.New("missing container in object")
}
var ecParentAddress oid.Address
if ecHeader := obj.ECHeader(); ecHeader != nil {
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ecHeader.Parent())
}
isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) {

View file

@ -18,7 +18,7 @@ type ExistsPrm struct {
// Exists option to set object checked for existence.
Address oid.Address
// Exists option to set parent object checked for existence.
ParentAddress oid.Address
ECParentAddress oid.Address
}
// ExistsRes groups the resulting values of Exists operation.
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
} else {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.Address)
existsPrm.SetParent(prm.ParentAddress)
existsPrm.SetECParent(prm.ECParentAddress)
var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm)

View file

@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
}
chunkIDs[ch.Index] = ecInfoChunkID
}
} else if client.IsErrObjectAlreadyRemoved(err) {
restore = false
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
p.replicator.HandleReplicationTask(ctx, replicator.Task{