Compare commits
3 commits
master
...
fix/path_r
Author | SHA1 | Date | |
---|---|---|---|
9139d13f95 | |||
212a81095e | |||
5b42ea4c82 |
7 changed files with 82 additions and 18 deletions
|
@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
for range b.N {
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.Address = addr
|
||||
shPrm.ParentAddress = oid.Address{}
|
||||
shPrm.ECParentAddress = oid.Address{}
|
||||
ok, _, err := e.exists(context.Background(), shPrm)
|
||||
if err != nil || ok {
|
||||
b.Fatalf("%t %v", ok, err)
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestStorageEngine_ECInhume(t *testing.T) {
|
||||
parentObjectAddress := oidtest.Address()
|
||||
containerID := parentObjectAddress.Container()
|
||||
|
||||
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
|
||||
chunkObject0.SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: parentObjectAddress.Object(),
|
||||
}, 0, 4, []byte{}, 0))
|
||||
|
||||
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
|
||||
chunkObject1.SetECHeader(objectSDK.NewECHeader(
|
||||
objectSDK.ECParentInfo{
|
||||
ID: parentObjectAddress.Object(),
|
||||
}, 1, 4, []byte{}, 0))
|
||||
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
|
||||
payload, err := tombstone.Marshal()
|
||||
require.NoError(t, err)
|
||||
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
|
||||
tombstoneObject.SetType(objectSDK.TypeTombstone)
|
||||
tombstoneObject.SetPayload(payload)
|
||||
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
|
||||
|
||||
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
|
||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
|
||||
|
||||
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||
|
||||
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
|
||||
|
||||
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
|
||||
}
|
||||
|
||||
func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
|
||||
// In #1146 this check was parallelized, however, it became
|
||||
// much slower on fast machines for 4 shards.
|
||||
var parent oid.Address
|
||||
var ecParent oid.Address
|
||||
if prm.Object.ECHeader() != nil {
|
||||
parent.SetObject(prm.Object.ECHeader().Parent())
|
||||
parent.SetContainer(addr.Container())
|
||||
ecParent.SetObject(prm.Object.ECHeader().Parent())
|
||||
ecParent.SetContainer(addr.Container())
|
||||
}
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.Address = addr
|
||||
shPrm.ParentAddress = parent
|
||||
shPrm.ECParentAddress = ecParent
|
||||
existed, locked, err := e.exists(ctx, shPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !existed && locked {
|
||||
lockers, err := e.GetLocked(ctx, parent)
|
||||
lockers, err := e.GetLocked(ctx, ecParent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ import (
|
|||
|
||||
// ExistsPrm groups the parameters of Exists operation.
|
||||
type ExistsPrm struct {
|
||||
addr oid.Address
|
||||
paddr oid.Address
|
||||
addr oid.Address
|
||||
ecParentAddr oid.Address
|
||||
}
|
||||
|
||||
// ExistsRes groups the resulting values of Exists operation.
|
||||
|
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
|||
p.addr = addr
|
||||
}
|
||||
|
||||
// SetParent is an Exists option to set objects parent.
|
||||
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
||||
p.paddr = addr
|
||||
// SetECParent is an Exists option to set objects parent.
|
||||
func (p *ExistsPrm) SetECParent(addr oid.Address) {
|
||||
p.ecParentAddr = addr
|
||||
}
|
||||
|
||||
// Exists returns the fact that the object is in the metabase.
|
||||
|
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
var locked bool
|
||||
if !parent.Equals(oid.Address{}) {
|
||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||
if !ecParent.Equals(oid.Address{}) {
|
||||
st, err := objectStatus(tx, ecParent, currEpoch)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
switch st {
|
||||
case 2:
|
||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||
case 3:
|
||||
return false, locked, ErrObjectIsExpired
|
||||
}
|
||||
|
||||
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
||||
}
|
||||
// check graveyard and object expiration first
|
||||
st, err := objectStatus(tx, addr, currEpoch)
|
||||
|
|
|
@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
return PutRes{}, errors.New("missing container in object")
|
||||
}
|
||||
|
||||
var ecParentAddress oid.Address
|
||||
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ecHeader.Parent())
|
||||
}
|
||||
|
||||
isParent := si != nil
|
||||
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
if errors.As(err, &splitInfoError) {
|
||||
|
|
|
@ -18,7 +18,7 @@ type ExistsPrm struct {
|
|||
// Exists option to set object checked for existence.
|
||||
Address oid.Address
|
||||
// Exists option to set parent object checked for existence.
|
||||
ParentAddress oid.Address
|
||||
ECParentAddress oid.Address
|
||||
}
|
||||
|
||||
// ExistsRes groups the resulting values of Exists operation.
|
||||
|
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
} else {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(prm.Address)
|
||||
existsPrm.SetParent(prm.ParentAddress)
|
||||
existsPrm.SetECParent(prm.ECParentAddress)
|
||||
|
||||
var res meta.ExistsRes
|
||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||
|
|
|
@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
|
|||
}
|
||||
chunkIDs[ch.Index] = ecInfoChunkID
|
||||
}
|
||||
} else if client.IsErrObjectAlreadyRemoved(err) {
|
||||
restore = false
|
||||
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||
|
|
Loading…
Reference in a new issue