Assemble complex object headers if linking object is deleted on EC containers #1295
|
@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
return false
|
return false
|
||||||
} else {
|
} else {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
e.reportShardError(sh, "could not check object existence", err)
|
e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
|
|
||||||
_, err = sh.Inhume(ctx, shPrm)
|
_, err = sh.Inhume(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
|
||||||
|
|
||||||
var target *apistatus.ObjectLocked
|
var target *apistatus.ObjectLocked
|
||||||
locked.is = errors.As(err, &target)
|
locked.is = errors.As(err, &target)
|
||||||
|
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
err := objID.ReadFromV2(chunk.ID)
|
err := objID.ReadFromV2(chunk.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not delete EC chunk", err)
|
e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
|
||||||
}
|
}
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
inhumePrm.MarkAsGarbage(addr)
|
inhumePrm.MarkAsGarbage(addr)
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// exists return in the first value true if object exists.
|
// exists return in the first value true if object exists.
|
||||||
|
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
|
||||||
}
|
}
|
||||||
|
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
i.ObjectExpired = true
|
i.ObjectExpired = true
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
i.Engine.reportShardError(sh, "could not get object from shard", err)
|
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HeadPrm groups the parameters of Head operation.
|
// HeadPrm groups the parameters of Head operation.
|
||||||
|
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
outError = new(apistatus.ObjectNotFound)
|
outError = new(apistatus.ObjectNotFound)
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
e.reportShardError(sh, "could not head object from shard", err)
|
e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if head != nil {
|
||||||
|
|||||||
|
return HeadRes{head: head}, nil
|
||||||
|
}
|
||||||
if outSI != nil {
|
if outSI != nil {
|
||||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||||
} else if outEI != nil {
|
|
||||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
|
||||||
} else if head == nil {
|
|
||||||
return HeadRes{}, outError
|
|
||||||
}
|
}
|
||||||
|
if outEI != nil {
|
||||||
return HeadRes{
|
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||||
head: head,
|
}
|
||||||
}, nil
|
return HeadRes{}, outError
|
||||||
}
|
}
|
||||||
|
|
||||||
// Head reads object header from local storage by provided address.
|
// Head reads object header from local storage by provided address.
|
||||||
|
|
|
@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
var ecErr *objectSDK.ECInfoError
|
var ecErr *objectSDK.ECInfoError
|
||||||
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
||||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
locked, err = h.Shard.IsLocked(ctx, addr)
|
locked, err = h.Shard.IsLocked(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr),
|
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
outErr = err
|
outErr = err
|
||||||
return false
|
return false
|
||||||
|
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
ld, err := h.Shard.GetLocked(ctx, addr)
|
ld, err := h.Shard.GetLocked(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
|
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
outErr = err
|
outErr = err
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errLockFailed = errors.New("lock operation failed")
|
var errLockFailed = errors.New("lock operation failed")
|
||||||
|
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
||||||
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
||||||
// code is pretty similar to inhumeAddr, maybe unify?
|
// code is pretty similar to inhumeAddr, maybe unify?
|
||||||
root := false
|
root := false
|
||||||
|
|
||||||
var addrLocked oid.Address
|
var addrLocked oid.Address
|
||||||
addrLocked.SetContainer(idCnr)
|
addrLocked.SetContainer(idCnr)
|
||||||
addrLocked.SetObject(locked)
|
addrLocked.SetObject(locked)
|
||||||
|
|
||||||
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
|
||||||
defer func() {
|
defer func() {
|
||||||
// if object is root we continue since information about it
|
// if object is root we continue since information about it
|
||||||
|
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
if checkExists {
|
if checkExists {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.Address = addrLocked
|
existsPrm.Address = addrLocked
|
||||||
|
|
||||||
exRes, err := sh.Exists(ctx, existsPrm)
|
exRes, err := sh.Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
|
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
err = objID.ReadFromV2(chunk.ID)
|
err = objID.ReadFromV2(chunk.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not lock object in shard", err)
|
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
eclocked = append(eclocked, objID)
|
eclocked = append(eclocked, objID)
|
||||||
}
|
}
|
||||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not lock object in shard", err)
|
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
root = true
|
root = true
|
||||||
|
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
// do not lock it
|
// do not lock it
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
e.reportShardError(sh, "could not check locked object for presence in shard", err)
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
|
|
||||||
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not lock object in shard", err)
|
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
|
||||||
var errIrregular *apistatus.LockNonRegularObject
|
var errIrregular *apistatus.LockNonRegularObject
|
||||||
if errors.As(err, &errIrregular) {
|
if errors.As(err, &errIrregular) {
|
||||||
status = 1
|
status = 1
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
status = 2
|
status = 2
|
||||||
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
e.reportShardError(sh, "could not put object to shard", err)
|
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
|
|
||||||
return true // stop, return it back
|
return true // stop, return it back
|
||||||
default:
|
default:
|
||||||
i.Engine.reportShardError(sh, "could not get object from shard", err)
|
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
||||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
// pick last item, for now there is not difference which address to pick
|
var data []byte
|
||||||
// but later list might be sorted so first or last value can be more
|
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
||||||
// prioritized to choose
|
virtualOID := relativeLst[len(relativeLst)-i-1]
|
||||||
virtualOID := relativeLst[len(relativeLst)-1]
|
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||||
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
}
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
// check if any of the relatives is an EC object
|
||||||
|
for _, relative := range relativeLst {
|
||||||
|
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
|
||||||
|
if len(data) > 0 {
|
||||||
|
// we can't return object headers, but can return error,
|
||||||
|
// so assembler can try to assemble complex object
|
||||||
|
return nil, getSplitInfoError(tx, cnr, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
child := objectSDK.New()
|
child := objectSDK.New()
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *request) assemble(ctx context.Context) {
|
func (r *request) assemble(ctx context.Context) {
|
||||||
if !r.canAssemble() {
|
if !r.canAssembleComplexObject() {
|
||||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||||
|
|
||||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
|
|
@ -19,6 +19,7 @@ type assembler struct {
|
||||||
splitInfo *objectSDK.SplitInfo
|
splitInfo *objectSDK.SplitInfo
|
||||||
rng *objectSDK.Range
|
rng *objectSDK.Range
|
||||||
objGetter objectGetter
|
objGetter objectGetter
|
||||||
|
head bool
|
||||||
|
|
||||||
currentOffset uint64
|
currentOffset uint64
|
||||||
|
|
||||||
|
@ -30,18 +31,23 @@ func newAssembler(
|
||||||
splitInfo *objectSDK.SplitInfo,
|
splitInfo *objectSDK.SplitInfo,
|
||||||
rng *objectSDK.Range,
|
rng *objectSDK.Range,
|
||||||
objGetter objectGetter,
|
objGetter objectGetter,
|
||||||
|
head bool,
|
||||||
) *assembler {
|
) *assembler {
|
||||||
return &assembler{
|
return &assembler{
|
||||||
addr: addr,
|
addr: addr,
|
||||||
rng: rng,
|
rng: rng,
|
||||||
splitInfo: splitInfo,
|
splitInfo: splitInfo,
|
||||||
objGetter: objGetter,
|
objGetter: objGetter,
|
||||||
|
head: head,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
if a.head {
|
||||||
|
return a.assembleHeader(ctx, writer)
|
||||||
|
}
|
||||||
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
|
||||||
return a.parentObject, nil
|
return a.parentObject, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
var sourceObjectIDs []oid.ID
|
||||||
|
sourceObjectID, ok := a.splitInfo.Link()
|
||||||
|
if ok {
|
||||||
|
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||||
|
}
|
||||||
|
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||||
fyrchik
commented
It seems to be completely independent of other It seems to be completely independent of other `assembler` methods, do we have a reason to provide `headOnly` as a parameter to the `assembler` instead of writing a separate function?
|
|||||||
|
if ok {
|
||||||
|
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||||
|
}
|
||||||
|
if len(sourceObjectIDs) == 0 {
|
||||||
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
}
|
||||||
|
for _, sourceObjectID = range sourceObjectIDs {
|
||||||
|
obj, err := a.getParent(ctx, sourceObjectID, writer)
|
||||||
|
if err == nil {
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
parent := obj.Parent()
|
||||||
|
if parent == nil {
|
||||||
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
}
|
||||||
|
if err := writer.WriteHeader(ctx, parent); err != nil {
|
||||||
|
return nil, err
|
||||||
fyrchik
commented
We return We return `false`, then create an error and will log again. Why is this `Warn`?
fyrchik
commented
I don't like the assembler being able to log anything, as it will be eventually moved to the SDK. I don't like the assembler being able to log anything, as it will be eventually moved to the SDK.
1. But now assembler is here. Logger could be replaced with an interface.
2. I think it is useful for troubleshooting.
fyrchik
commented
It only adds complexity (by producing side-effects), even though we can return everything in the result. It only adds complexity (by producing side-effects), even though we can return everything in the result.
The result in turn is what we log and can use for troubleshooting.
|
|||||||
|
}
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
||||||
sourceObjectID, ok := a.splitInfo.Link()
|
sourceObjectID, ok := a.splitInfo.Link()
|
||||||
if ok {
|
if ok {
|
||||||
|
|
|
@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
|
|
||||||
t.Run("VIRTUAL", func(t *testing.T) {
|
t.Run("VIRTUAL", func(t *testing.T) {
|
||||||
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
|
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
|
||||||
headPrm := newHeadPrm(false, nil)
|
headPrm := newHeadPrm(true, nil)
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I would expect the tests to stay the same, what is the reason? I would expect the tests to stay the same, what is the reason?
For For `raw == false` cases it is expected to return object headers, but not split info error, as test does. So this test looks loke something unrelated to real code and works just because of mocks.
|
|||||||
headPrm.WithAddress(addr)
|
headPrm.WithAddress(addr)
|
||||||
|
|
||||||
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
|
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
|
||||||
|
|
|
@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
|
||||||
return r.keyStore.GetKey(sessionInfo)
|
return r.keyStore.GetKey(sessionInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) canAssemble() bool {
|
func (r *request) canAssembleComplexObject() bool {
|
||||||
return !r.isRaw() && !r.headOnly()
|
return !r.isRaw()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||||||
|
|
Reason to change priority: if some shard returns split info error, but other shard has linking object, then return head result directly.