Assemble complex object headers if linking object is deleted on EC containers support v0.42 #1299
13 changed files with 95 additions and 42 deletions
|
@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
return false
|
return false
|
||||||
} else {
|
} else {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
e.reportShardError(sh, "could not check object existence", err)
|
e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
|
|
||||||
_, err = sh.Inhume(ctx, shPrm)
|
_, err = sh.Inhume(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
|
||||||
|
|
||||||
var target *apistatus.ObjectLocked
|
var target *apistatus.ObjectLocked
|
||||||
locked.is = errors.As(err, &target)
|
locked.is = errors.As(err, &target)
|
||||||
|
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
err := objID.ReadFromV2(chunk.ID)
|
err := objID.ReadFromV2(chunk.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not delete EC chunk", err)
|
e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
|
||||||
}
|
}
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
inhumePrm.MarkAsGarbage(addr)
|
inhumePrm.MarkAsGarbage(addr)
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// exists return in the first value true if object exists.
|
// exists return in the first value true if object exists.
|
||||||
|
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
|
||||||
}
|
}
|
||||||
|
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
i.ObjectExpired = true
|
i.ObjectExpired = true
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
i.Engine.reportShardError(sh, "could not get object from shard", err)
|
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HeadPrm groups the parameters of Head operation.
|
// HeadPrm groups the parameters of Head operation.
|
||||||
|
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
outError = new(apistatus.ObjectNotFound)
|
outError = new(apistatus.ObjectNotFound)
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
e.reportShardError(sh, "could not head object from shard", err)
|
e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if head != nil {
|
||||||
|
return HeadRes{head: head}, nil
|
||||||
|
}
|
||||||
if outSI != nil {
|
if outSI != nil {
|
||||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||||
} else if outEI != nil {
|
|
||||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
|
||||||
} else if head == nil {
|
|
||||||
return HeadRes{}, outError
|
|
||||||
}
|
}
|
||||||
|
if outEI != nil {
|
||||||
return HeadRes{
|
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||||
head: head,
|
}
|
||||||
}, nil
|
return HeadRes{}, outError
|
||||||
}
|
}
|
||||||
|
|
||||||
// Head reads object header from local storage by provided address.
|
// Head reads object header from local storage by provided address.
|
||||||
|
|
|
@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
var ecErr *objectSDK.ECInfoError
|
var ecErr *objectSDK.ECInfoError
|
||||||
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
||||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
locked, err = h.Shard.IsLocked(ctx, addr)
|
locked, err = h.Shard.IsLocked(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr),
|
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
outErr = err
|
outErr = err
|
||||||
return false
|
return false
|
||||||
|
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
ld, err := h.Shard.GetLocked(ctx, addr)
|
ld, err := h.Shard.GetLocked(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
|
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
outErr = err
|
outErr = err
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errLockFailed = errors.New("lock operation failed")
|
var errLockFailed = errors.New("lock operation failed")
|
||||||
|
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
||||||
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
||||||
// code is pretty similar to inhumeAddr, maybe unify?
|
// code is pretty similar to inhumeAddr, maybe unify?
|
||||||
root := false
|
root := false
|
||||||
|
|
||||||
var addrLocked oid.Address
|
var addrLocked oid.Address
|
||||||
addrLocked.SetContainer(idCnr)
|
addrLocked.SetContainer(idCnr)
|
||||||
addrLocked.SetObject(locked)
|
addrLocked.SetObject(locked)
|
||||||
|
|
||||||
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
|
||||||
defer func() {
|
defer func() {
|
||||||
// if object is root we continue since information about it
|
// if object is root we continue since information about it
|
||||||
|
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
if checkExists {
|
if checkExists {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.Address = addrLocked
|
existsPrm.Address = addrLocked
|
||||||
|
|
||||||
exRes, err := sh.Exists(ctx, existsPrm)
|
exRes, err := sh.Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
|
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
err = objID.ReadFromV2(chunk.ID)
|
err = objID.ReadFromV2(chunk.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not lock object in shard", err)
|
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
eclocked = append(eclocked, objID)
|
eclocked = append(eclocked, objID)
|
||||||
}
|
}
|
||||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not lock object in shard", err)
|
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
root = true
|
root = true
|
||||||
|
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
// do not lock it
|
// do not lock it
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
e.reportShardError(sh, "could not check locked object for presence in shard", err)
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
|
|
||||||
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not lock object in shard", err)
|
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
|
||||||
var errIrregular *apistatus.LockNonRegularObject
|
var errIrregular *apistatus.LockNonRegularObject
|
||||||
if errors.As(err, &errIrregular) {
|
if errors.As(err, &errIrregular) {
|
||||||
status = 1
|
status = 1
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
status = 2
|
status = 2
|
||||||
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
e.reportShardError(sh, "could not put object to shard", err)
|
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
|
|
||||||
return true // stop, return it back
|
return true // stop, return it back
|
||||||
default:
|
default:
|
||||||
i.Engine.reportShardError(sh, "could not get object from shard", err)
|
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
||||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
// pick last item, for now there is not difference which address to pick
|
var data []byte
|
||||||
// but later list might be sorted so first or last value can be more
|
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
||||||
// prioritized to choose
|
virtualOID := relativeLst[len(relativeLst)-i-1]
|
||||||
virtualOID := relativeLst[len(relativeLst)-1]
|
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||||
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
}
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
// check if any of the relatives is an EC object
|
||||||
|
for _, relative := range relativeLst {
|
||||||
|
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
|
||||||
|
if len(data) > 0 {
|
||||||
|
// we can't return object headers, but can return error,
|
||||||
|
// so assembler can try to assemble complex object
|
||||||
|
return nil, getSplitInfoError(tx, cnr, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
child := objectSDK.New()
|
child := objectSDK.New()
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *request) assemble(ctx context.Context) {
|
func (r *request) assemble(ctx context.Context) {
|
||||||
if !r.canAssemble() {
|
if !r.canAssembleComplexObject() {
|
||||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||||
|
|
||||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
|
|
@ -19,6 +19,7 @@ type assembler struct {
|
||||||
splitInfo *objectSDK.SplitInfo
|
splitInfo *objectSDK.SplitInfo
|
||||||
rng *objectSDK.Range
|
rng *objectSDK.Range
|
||||||
objGetter objectGetter
|
objGetter objectGetter
|
||||||
|
head bool
|
||||||
|
|
||||||
currentOffset uint64
|
currentOffset uint64
|
||||||
|
|
||||||
|
@ -30,18 +31,23 @@ func newAssembler(
|
||||||
splitInfo *objectSDK.SplitInfo,
|
splitInfo *objectSDK.SplitInfo,
|
||||||
rng *objectSDK.Range,
|
rng *objectSDK.Range,
|
||||||
objGetter objectGetter,
|
objGetter objectGetter,
|
||||||
|
head bool,
|
||||||
) *assembler {
|
) *assembler {
|
||||||
return &assembler{
|
return &assembler{
|
||||||
addr: addr,
|
addr: addr,
|
||||||
rng: rng,
|
rng: rng,
|
||||||
splitInfo: splitInfo,
|
splitInfo: splitInfo,
|
||||||
objGetter: objGetter,
|
objGetter: objGetter,
|
||||||
|
head: head,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
if a.head {
|
||||||
|
return a.assembleHeader(ctx, writer)
|
||||||
|
}
|
||||||
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
|
||||||
return a.parentObject, nil
|
return a.parentObject, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
var sourceObjectIDs []oid.ID
|
||||||
|
sourceObjectID, ok := a.splitInfo.Link()
|
||||||
|
if ok {
|
||||||
|
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||||
|
}
|
||||||
|
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||||
|
if ok {
|
||||||
|
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||||
|
}
|
||||||
|
if len(sourceObjectIDs) == 0 {
|
||||||
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
}
|
||||||
|
for _, sourceObjectID = range sourceObjectIDs {
|
||||||
|
obj, err := a.getParent(ctx, sourceObjectID, writer)
|
||||||
|
if err == nil {
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
parent := obj.Parent()
|
||||||
|
if parent == nil {
|
||||||
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
|
}
|
||||||
|
if err := writer.WriteHeader(ctx, parent); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
||||||
sourceObjectID, ok := a.splitInfo.Link()
|
sourceObjectID, ok := a.splitInfo.Link()
|
||||||
if ok {
|
if ok {
|
||||||
|
|
|
@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
|
|
||||||
t.Run("VIRTUAL", func(t *testing.T) {
|
t.Run("VIRTUAL", func(t *testing.T) {
|
||||||
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
|
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
|
||||||
headPrm := newHeadPrm(false, nil)
|
headPrm := newHeadPrm(true, nil)
|
||||||
headPrm.WithAddress(addr)
|
headPrm.WithAddress(addr)
|
||||||
|
|
||||||
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
|
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
|
||||||
|
|
|
@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
|
||||||
return r.keyStore.GetKey(sessionInfo)
|
return r.keyStore.GetKey(sessionInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) canAssemble() bool {
|
func (r *request) canAssembleComplexObject() bool {
|
||||||
return !r.isRaw() && !r.headOnly()
|
return !r.isRaw()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||||||
|
|
Loading…
Add table
Reference in a new issue