Assemble complex object headers if linking object is deleted on EC containers #1295

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/virtual_object_head into master 2024-09-04 19:51:10 +00:00
13 changed files with 95 additions and 42 deletions

View file

@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
return false
} else {
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err)
e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
}
return false
}
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
_, err = sh.Inhume(ctx, shPrm)
if err != nil {
e.reportShardError(sh, "could not inhume object in shard", err)
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
var target *apistatus.ObjectLocked
locked.is = errors.As(err, &target)
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
var objID oid.ID
err := objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err)
e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
}
addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr)

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// exists return in the first value true if object exists.
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
}
return false
}

View file

@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
i.ObjectExpired = true
return true
default:
i.Engine.reportShardError(sh, "could not get object from shard", err)
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false
}
})

View file

@ -12,6 +12,7 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// HeadPrm groups the parameters of Head operation.
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
outError = new(apistatus.ObjectNotFound)
return true
default:
e.reportShardError(sh, "could not head object from shard", err)
e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
return false
}
}
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true
})
if head != nil {

Reason to change priority: if some shard returns split info error, but other shard has linking object, then return head result directly.

Reason to change priority: if some shard returns split info error, but other shard has linking object, then return head result directly.
return HeadRes{head: head}, nil
}
if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
}
return HeadRes{
head: head,
}, nil
if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
}
return HeadRes{}, outError
}
// Head reads object header from local storage by provided address.

View file

@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
var siErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err)
e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
return
}
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
return true
}
e.reportShardError(sh, "could not inhume object in shard", err)
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
return false
}
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(ctx, addr)
if err != nil {
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr),
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
return false
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
}

View file

@ -13,6 +13,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errLockFailed = errors.New("lock operation failed")
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
// code is pretty similar to inhumeAddr, maybe unify?
root := false
var addrLocked oid.Address
addrLocked.SetContainer(idCnr)
addrLocked.SetObject(locked)
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
defer func() {
// if object is root we continue since information about it
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists {
var existsPrm shard.ExistsPrm
existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm)
if err != nil {
var siErr *objectSDK.SplitInfoError
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
var objID oid.ID
err = objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false
}
eclocked = append(eclocked, objID)
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false
}
root = true
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
// do not lock it
return true
}
e.reportShardError(sh, "could not check locked object for presence in shard", err)
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return
}
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
var errIrregular *apistatus.LockNonRegularObject
if errors.As(err, &errIrregular) {
status = 1
return true
}
return false
}
status = 2
return true
})
return
}

View file

@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
return
}
e.reportShardError(sh, "could not put object to shard", err)
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
return
}

View file

@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
return true // stop, return it back
default:
i.Engine.reportShardError(sh, "could not get object from shard", err)
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false
}
})

View file

@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// pick last item, for now there is not difference which address to pick
// but later list might be sorted so first or last value can be more
// prioritized to choose
virtualOID := relativeLst[len(relativeLst)-1]
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
var data []byte
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
virtualOID := relativeLst[len(relativeLst)-i-1]
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
}
if len(data) == 0 {
// check if any of the relatives is an EC object
for _, relative := range relativeLst {
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
if len(data) > 0 {
// we can't return object headers, but can return error,
// so assembler can try to assemble complex object
return nil, getSplitInfoError(tx, cnr, key)
}
}
}
child := objectSDK.New()

View file

@ -12,7 +12,7 @@ import (
)
func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() {
if !r.canAssembleComplexObject() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheObject)
r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -19,6 +19,7 @@ type assembler struct {
splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range
objGetter objectGetter
head bool
currentOffset uint64
@ -30,18 +31,23 @@ func newAssembler(
splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range,
objGetter objectGetter,
head bool,
) *assembler {
return &assembler{
addr: addr,
rng: rng,
splitInfo: splitInfo,
objGetter: objGetter,
head: head,
}
}
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.assembleHeader(ctx, writer)
}
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
return a.parentObject, nil
}
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()

It seems to be completely independent of other assembler methods, do we have a reason to provide headOnly as a parameter to the assembler instead of writing a separate function?

It seems to be completely independent of other `assembler` methods, do we have a reason to provide `headOnly` as a parameter to the `assembler` instead of writing a separate function?

Good point. Now this method looks more related to assembling: it tries to find parent headers from linking and last part objects.

Good point. Now this method looks more related to assembling: it tries to find parent headers from linking and last part objects.
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err

We return false, then create an error and will log again. Why is this Warn?

We return `false`, then create an error and will log again. Why is this `Warn`?

This Warn explains why Assemble returns error.

This `Warn` explains why `Assemble` returns error.

I don't like the assembler being able to log anything, as it will be eventually moved to the SDK.

I don't like the assembler being able to log anything, as it will be eventually moved to the SDK.
  1. But now assembler is here. Logger could be replaced with an interface.
  2. I think it is useful for troubleshooting.
1. But now assembler is here. Logger could be replaced with an interface. 2. I think it is useful for troubleshooting.

It only adds complexity (by producing side-effects), even though we can return everything in the result.
The result in turn is what we log and can use for troubleshooting.

It only adds complexity (by producing side-effects), even though we can return everything in the result. The result in turn is what we log and can use for troubleshooting.

I disagree, but fixed.

I disagree, but fixed.
}
return obj, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link()
if ok {

View file

@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
t.Run("VIRTUAL", func(t *testing.T) {
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
headPrm := newHeadPrm(false, nil)

For raw == false cases it is expected to return object headers, but not split info error.

For `raw == false` cases it is expected to return object headers, but not split info error.
headPrm := newHeadPrm(true, nil)
fyrchik marked this conversation as resolved Outdated

I would expect the tests to stay the same, what is the reason?

I would expect the tests to stay the same, what is the reason?

For raw == false cases it is expected to return object headers, but not split info error, as test does. So this test looks loke something unrelated to real code and works just because of mocks.

For `raw == false` cases it is expected to return object headers, but not split info error, as test does. So this test looks loke something unrelated to real code and works just because of mocks.
headPrm.WithAddress(addr)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())

View file

@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
return r.keyStore.GetKey(sessionInfo)
}
func (r *request) canAssemble() bool {
return !r.isRaw() && !r.headOnly()
func (r *request) canAssembleComplexObject() bool {

To make method name more clear.

To make method name more clear.
return !r.isRaw()
}
func (r *request) splitInfo() *objectSDK.SplitInfo {