Assemble complex object headers if linking object is deleted on EC containers #1295

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/virtual_object_head into master 2024-09-04 19:51:10 +00:00
13 changed files with 95 additions and 42 deletions

View file

@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
return false return false
} else { } else {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err) e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
} }
return false return false
} }
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
_, err = sh.Inhume(ctx, shPrm) _, err = sh.Inhume(ctx, shPrm)
if err != nil { if err != nil {
e.reportShardError(sh, "could not inhume object in shard", err) e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
var target *apistatus.ObjectLocked var target *apistatus.ObjectLocked
locked.is = errors.As(err, &target) locked.is = errors.As(err, &target)
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
var objID oid.ID var objID oid.ID
err := objID.ReadFromV2(chunk.ID) err := objID.ReadFromV2(chunk.ID)
if err != nil { if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err) e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
} }
addr.SetObject(objID) addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr) inhumePrm.MarkAsGarbage(addr)

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
) )
// exists return in the first value true if object exists. // exists return in the first value true if object exists.
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
} }
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err) e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
} }
return false return false
} }

View file

@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
i.ObjectExpired = true i.ObjectExpired = true
return true return true
default: default:
i.Engine.reportShardError(sh, "could not get object from shard", err) i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false return false
} }
}) })

View file

@ -12,6 +12,7 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
) )
// HeadPrm groups the parameters of Head operation. // HeadPrm groups the parameters of Head operation.
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
outError = new(apistatus.ObjectNotFound) outError = new(apistatus.ObjectNotFound)
return true return true
default: default:
e.reportShardError(sh, "could not head object from shard", err) e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
return false return false
} }
} }
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true return true
}) })
if head != nil {

Reason to change priority: if some shard returns split info error, but other shard has linking object, then return head result directly.

Reason to change priority: if some shard returns split info error, but other shard has linking object, then return head result directly.
return HeadRes{head: head}, nil
}
if outSI != nil { if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
} }
if outEI != nil {
return HeadRes{ return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
head: head, }
}, nil return HeadRes{}, outError
} }
// Head reads object header from local storage by provided address. // Head reads object header from local storage by provided address.

View file

@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) { if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err) e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
return return
} }
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
return true return true
} }
e.reportShardError(sh, "could not inhume object in shard", err) e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
return false return false
} }
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(ctx, addr) locked, err = h.Shard.IsLocked(ctx, addr)
if err != nil { if err != nil {
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr), e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err outErr = err
return false return false
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr) ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil { if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr), e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err outErr = err
} }

View file

@ -13,6 +13,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
var errLockFailed = errors.New("lock operation failed") var errLockFailed = errors.New("lock operation failed")
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
// code is pretty similar to inhumeAddr, maybe unify? // code is pretty similar to inhumeAddr, maybe unify?
root := false root := false
var addrLocked oid.Address var addrLocked oid.Address
addrLocked.SetContainer(idCnr) addrLocked.SetContainer(idCnr)
addrLocked.SetObject(locked) addrLocked.SetObject(locked)
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
defer func() { defer func() {
// if object is root we continue since information about it // if object is root we continue since information about it
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists { if checkExists {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.Address = addrLocked existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm) exRes, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
var objID oid.ID var objID oid.ID
err = objID.ReadFromV2(chunk.ID) err = objID.ReadFromV2(chunk.ID)
if err != nil { if err != nil {
e.reportShardError(sh, "could not lock object in shard", err) e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false return false
} }
eclocked = append(eclocked, objID) eclocked = append(eclocked, objID)
} }
err = sh.Lock(ctx, idCnr, locker, eclocked) err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil { if err != nil {
e.reportShardError(sh, "could not lock object in shard", err) e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false return false
} }
root = true root = true
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
// do not lock it // do not lock it
return true return true
} }
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
e.reportShardError(sh, "could not check locked object for presence in shard", err) zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return return
} }
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked}) err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
if err != nil { if err != nil {
e.reportShardError(sh, "could not lock object in shard", err) e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
var errIrregular *apistatus.LockNonRegularObject var errIrregular *apistatus.LockNonRegularObject
if errors.As(err, &errIrregular) { if errors.As(err, &errIrregular) {
status = 1 status = 1
return true return true
} }
return false return false
} }
status = 2 status = 2
return true return true
}) })
return return
} }

View file

@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
return return
} }
e.reportShardError(sh, "could not put object to shard", err) e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
return return
} }

View file

@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
return true // stop, return it back return true // stop, return it back
default: default:
i.Engine.reportShardError(sh, "could not get object from shard", err) i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false return false
} }
}) })

View file

@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
// pick last item, for now there is not difference which address to pick var data []byte
// but later list might be sorted so first or last value can be more for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
// prioritized to choose virtualOID := relativeLst[len(relativeLst)-i-1]
virtualOID := relativeLst[len(relativeLst)-1] data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) }
if len(data) == 0 {
// check if any of the relatives is an EC object
for _, relative := range relativeLst {
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
if len(data) > 0 {
// we can't return object headers, but can return error,
// so assembler can try to assemble complex object
return nil, getSplitInfoError(tx, cnr, key)
}
}
}
child := objectSDK.New() child := objectSDK.New()

View file

@ -12,7 +12,7 @@ import (
) )
func (r *request) assemble(ctx context.Context) { func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() { if !r.canAssembleComplexObject() {
r.log.Debug(logs.GetCanNotAssembleTheObject) r.log.Debug(logs.GetCanNotAssembleTheObject)
return return
} }
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheObject) r.log.Debug(logs.GetTryingToAssembleTheObject)
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
r.log.Debug(logs.GetAssemblingSplittedObject, r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -19,6 +19,7 @@ type assembler struct {
splitInfo *objectSDK.SplitInfo splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range rng *objectSDK.Range
objGetter objectGetter objGetter objectGetter
head bool
currentOffset uint64 currentOffset uint64
@ -30,18 +31,23 @@ func newAssembler(
splitInfo *objectSDK.SplitInfo, splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range, rng *objectSDK.Range,
objGetter objectGetter, objGetter objectGetter,
head bool,
) *assembler { ) *assembler {
return &assembler{ return &assembler{
addr: addr, addr: addr,
rng: rng, rng: rng,
splitInfo: splitInfo, splitInfo: splitInfo,
objGetter: objGetter, objGetter: objGetter,
head: head,
} }
} }
// Assemble assembles splitted large object and writes it's content to ObjectWriter. // Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object. // It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.assembleHeader(ctx, writer)
}
sourceObjectID, ok := a.getLastPartOrLinkObjectID() sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok { if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo) return nil, objectSDK.NewSplitInfoError(a.splitInfo)
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
return a.parentObject, nil return a.parentObject, nil
} }
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()

It seems to be completely independent of other assembler methods, do we have a reason to provide headOnly as a parameter to the assembler instead of writing a separate function?

It seems to be completely independent of other `assembler` methods, do we have a reason to provide `headOnly` as a parameter to the `assembler` instead of writing a separate function?

Good point. Now this method looks more related to assembling: it tries to find parent headers from linking and last part objects.

Good point. Now this method looks more related to assembling: it tries to find parent headers from linking and last part objects.
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err

We return false, then create an error and will log again. Why is this Warn?

We return `false`, then create an error and will log again. Why is this `Warn`?

This Warn explains why Assemble returns error.

This `Warn` explains why `Assemble` returns error.

I don't like the assembler being able to log anything, as it will be eventually moved to the SDK.

I don't like the assembler being able to log anything, as it will be eventually moved to the SDK.
  1. But now assembler is here. Logger could be replaced with an interface.
  2. I think it is useful for troubleshooting.
1. But now assembler is here. Logger could be replaced with an interface. 2. I think it is useful for troubleshooting.

It only adds complexity (by producing side-effects), even though we can return everything in the result.
The result in turn is what we log and can use for troubleshooting.

It only adds complexity (by producing side-effects), even though we can return everything in the result. The result in turn is what we log and can use for troubleshooting.

I disagree, but fixed.

I disagree, but fixed.
}
return obj, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) { func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link() sourceObjectID, ok := a.splitInfo.Link()
if ok { if ok {

View file

@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
t.Run("VIRTUAL", func(t *testing.T) { t.Run("VIRTUAL", func(t *testing.T) {
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) { testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
headPrm := newHeadPrm(false, nil) headPrm := newHeadPrm(true, nil)

For raw == false cases it is expected to return object headers, but not split info error.

For `raw == false` cases it is expected to return object headers, but not split info error.
fyrchik marked this conversation as resolved Outdated

I would expect the tests to stay the same, what is the reason?

I would expect the tests to stay the same, what is the reason?

For raw == false cases it is expected to return object headers, but not split info error, as test does. So this test looks loke something unrelated to real code and works just because of mocks.

For `raw == false` cases it is expected to return object headers, but not split info error, as test does. So this test looks loke something unrelated to real code and works just because of mocks.
headPrm.WithAddress(addr) headPrm.WithAddress(addr)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())

View file

@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
return r.keyStore.GetKey(sessionInfo) return r.keyStore.GetKey(sessionInfo)
} }
func (r *request) canAssemble() bool { func (r *request) canAssembleComplexObject() bool {

To make method name more clear.

To make method name more clear.
return !r.isRaw() && !r.headOnly() return !r.isRaw()
} }
func (r *request) splitInfo() *objectSDK.SplitInfo { func (r *request) splitInfo() *objectSDK.SplitInfo {