Compare commits
3 commits
master
...
fix/path_r
Author | SHA1 | Date | |
---|---|---|---|
9139d13f95 | |||
212a81095e | |||
5b42ea4c82 |
7 changed files with 82 additions and 18 deletions
|
@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
for range b.N {
|
for range b.N {
|
||||||
var shPrm shard.ExistsPrm
|
var shPrm shard.ExistsPrm
|
||||||
shPrm.Address = addr
|
shPrm.Address = addr
|
||||||
shPrm.ParentAddress = oid.Address{}
|
shPrm.ECParentAddress = oid.Address{}
|
||||||
ok, _, err := e.exists(context.Background(), shPrm)
|
ok, _, err := e.exists(context.Background(), shPrm)
|
||||||
if err != nil || ok {
|
if err != nil || ok {
|
||||||
b.Fatalf("%t %v", ok, err)
|
b.Fatalf("%t %v", ok, err)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -93,6 +94,50 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStorageEngine_ECInhume(t *testing.T) {
|
||||||
|
parentObjectAddress := oidtest.Address()
|
||||||
|
containerID := parentObjectAddress.Container()
|
||||||
|
|
||||||
|
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
|
||||||
|
chunkObject0.SetECHeader(objectSDK.NewECHeader(
|
||||||
|
objectSDK.ECParentInfo{
|
||||||
|
ID: parentObjectAddress.Object(),
|
||||||
|
}, 0, 4, []byte{}, 0))
|
||||||
|
|
||||||
|
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
|
||||||
|
chunkObject1.SetECHeader(objectSDK.NewECHeader(
|
||||||
|
objectSDK.ECParentInfo{
|
||||||
|
ID: parentObjectAddress.Object(),
|
||||||
|
}, 1, 4, []byte{}, 0))
|
||||||
|
|
||||||
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
|
||||||
|
payload, err := tombstone.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
|
||||||
|
tombstoneObject.SetType(objectSDK.TypeTombstone)
|
||||||
|
tombstoneObject.SetPayload(payload)
|
||||||
|
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
|
||||||
|
|
||||||
|
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
|
||||||
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
|
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
|
||||||
|
|
||||||
|
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
|
||||||
|
|
||||||
|
var inhumePrm InhumePrm
|
||||||
|
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||||
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
|
||||||
|
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
|
||||||
|
|
||||||
|
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
|
||||||
|
}
|
||||||
|
|
||||||
func TestInhumeExpiredRegularObject(t *testing.T) {
|
func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
|
|
||||||
// In #1146 this check was parallelized, however, it became
|
// In #1146 this check was parallelized, however, it became
|
||||||
// much slower on fast machines for 4 shards.
|
// much slower on fast machines for 4 shards.
|
||||||
var parent oid.Address
|
var ecParent oid.Address
|
||||||
if prm.Object.ECHeader() != nil {
|
if prm.Object.ECHeader() != nil {
|
||||||
parent.SetObject(prm.Object.ECHeader().Parent())
|
ecParent.SetObject(prm.Object.ECHeader().Parent())
|
||||||
parent.SetContainer(addr.Container())
|
ecParent.SetContainer(addr.Container())
|
||||||
}
|
}
|
||||||
var shPrm shard.ExistsPrm
|
var shPrm shard.ExistsPrm
|
||||||
shPrm.Address = addr
|
shPrm.Address = addr
|
||||||
shPrm.ParentAddress = parent
|
shPrm.ECParentAddress = ecParent
|
||||||
existed, locked, err := e.exists(ctx, shPrm)
|
existed, locked, err := e.exists(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !existed && locked {
|
if !existed && locked {
|
||||||
lockers, err := e.GetLocked(ctx, parent)
|
lockers, err := e.GetLocked(ctx, ecParent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@ import (
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
type ExistsPrm struct {
|
type ExistsPrm struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
paddr oid.Address
|
ecParentAddr oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
|
@ -37,9 +37,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetParent is an Exists option to set objects parent.
|
// SetECParent is an Exists option to set objects parent.
|
||||||
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
func (p *ExistsPrm) SetECParent(addr oid.Address) {
|
||||||
p.paddr = addr
|
p.ecParentAddr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exists returns the fact that the object is in the metabase.
|
// Exists returns the fact that the object is in the metabase.
|
||||||
|
@ -82,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -90,10 +90,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||||
var locked bool
|
var locked bool
|
||||||
if !parent.Equals(oid.Address{}) {
|
if !ecParent.Equals(oid.Address{}) {
|
||||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
st, err := objectStatus(tx, ecParent, currEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
switch st {
|
||||||
|
case 2:
|
||||||
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||||
|
case 3:
|
||||||
|
return false, locked, ErrObjectIsExpired
|
||||||
|
}
|
||||||
|
|
||||||
|
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
||||||
}
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
st, err := objectStatus(tx, addr, currEpoch)
|
st, err := objectStatus(tx, addr, currEpoch)
|
||||||
|
|
|
@ -121,9 +121,15 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
return PutRes{}, errors.New("missing container in object")
|
return PutRes{}, errors.New("missing container in object")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ecParentAddress oid.Address
|
||||||
|
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||||
|
ecParentAddress.SetContainer(cnr)
|
||||||
|
ecParentAddress.SetObject(ecHeader.Parent())
|
||||||
|
}
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
|
||||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
|
|
|
@ -18,7 +18,7 @@ type ExistsPrm struct {
|
||||||
// Exists option to set object checked for existence.
|
// Exists option to set object checked for existence.
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
// Exists option to set parent object checked for existence.
|
// Exists option to set parent object checked for existence.
|
||||||
ParentAddress oid.Address
|
ECParentAddress oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
|
@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
} else {
|
} else {
|
||||||
var existsPrm meta.ExistsPrm
|
var existsPrm meta.ExistsPrm
|
||||||
existsPrm.SetAddress(prm.Address)
|
existsPrm.SetAddress(prm.Address)
|
||||||
existsPrm.SetParent(prm.ParentAddress)
|
existsPrm.SetECParent(prm.ECParentAddress)
|
||||||
|
|
||||||
var res meta.ExistsRes
|
var res meta.ExistsRes
|
||||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||||
|
|
|
@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
|
||||||
}
|
}
|
||||||
chunkIDs[ch.Index] = ecInfoChunkID
|
chunkIDs[ch.Index] = ecInfoChunkID
|
||||||
}
|
}
|
||||||
|
} else if client.IsErrObjectAlreadyRemoved(err) {
|
||||||
|
restore = false
|
||||||
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||||
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
|
Loading…
Reference in a new issue