frostfs-node/pkg/services/object/get/assemble.go
Dmitrii Stepanov 286242cad0 [#539] getsvc: Use buffer to assemble object
To reduce memory consumption.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-27 17:02:08 +03:00

150 lines
4.6 KiB
Go

package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
// Any access tokens are not expected to be used in the assembly process:
// - there is no requirement to specify child objects in session/bearer
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
// and, therefore, their missing in the original request should not be
// considered as error; on the other hand, without session for every child
// object, it is impossible to attach bearer token in the new generated
// requests correctly because the token has not been issued for that node's
// key;
// - the assembly process is expected to be handled on a container node
// only since the requests forwarding mechanism presentation; such the
// node should have enough rights for getting any child object by design.
r.prm.common.ForgetTokens()
// Do not use forwarding during assembly stage.
// Request forwarding closure inherited in produced
// `execCtx` so it should be disabled there.
r.disableForwarding()
r.log.Debug(logs.GetTryingToAssembleTheObject)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil {
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
}
var errSplitInfo *objectSDK.SplitInfoError
var errRemovedRemote *apistatus.ObjectAlreadyRemoved
var errOutOfRangeRemote *apistatus.ObjectOutOfRange
var errRemovedLocal apistatus.ObjectAlreadyRemoved
var errOutOfRangeLocal apistatus.ObjectOutOfRange
switch {
default:
r.status = statusUndefined
r.err = err
case err == nil:
r.status = statusOK
r.err = nil
r.collectedObject = obj
case errors.As(err, &errRemovedRemote):
r.status = statusINHUMED
r.err = errRemovedRemote
case errors.As(err, &errRemovedLocal):
r.status = statusINHUMED
r.err = errRemovedLocal
case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL
r.err = errSplitInfo
case errors.As(err, &errOutOfRangeRemote):
r.status = statusOutOfRange
r.err = errOutOfRangeRemote
case errors.As(err, &errOutOfRangeLocal):
r.status = statusOutOfRange
r.err = errOutOfRangeLocal
}
}
func equalAddresses(a, b oid.Address) bool {
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
}
func (r *request) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter()
p := RequestParameters{}
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(r.containerID())
p.addr.SetObject(id)
p.head = true
p.SetHeaderWriter(w)
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
return nil, err
}
return w.Object(), nil
}
func (r *request) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range, payloadBuffer []byte) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter()
w.SetPayloadBuffer(payloadBuffer)
p := r.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.rng = rng
p.addr.SetContainer(r.containerID())
p.addr.SetObject(id)
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
return nil, err
}
return w.Object(), nil
}
func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm RequestParameters) error {
detachedExecutor := &request{
keyStore: r.keyStore,
traverserGenerator: r.traverserGenerator,
remoteStorageConstructor: r.remoteStorageConstructor,
epochSource: r.epochSource,
localStorage: r.localStorage,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
log: r.log,
}
detachedExecutor.execute(ctx)
return detachedExecutor.statusError.err
}