frostfs-node/pkg/services/object/get/request.go
Dmitrii Stepanov fc383ea6ae [] getSvc: Fix EC objects get
Now EC objects assembling is performed concurrently.
Also fixed issue with an error in case of getting
EC object via non-container node.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-17 14:24:27 +03:00

248 lines
5.3 KiB
Go

package getsvc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type request struct {
prm RequestParameters
statusError
infoSplit *objectSDK.SplitInfo
infoEC *ecInfo
log *logger.Logger
collectedObject *objectSDK.Object
curProcEpoch uint64
keyStore keyStorage
epochSource epochSource
traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor
localStorage localStorage
containerSource container.Source
}
func (r *request) setLogger(l *logger.Logger) {
req := "GET"
if r.headOnly() {
req = "HEAD"
} else if r.ctxRange() != nil {
req = "GET_RANGE"
}
r.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", r.address()),
zap.Bool("raw", r.isRaw()),
zap.Bool("local", r.isLocal()),
zap.Bool("with session", r.prm.common.SessionToken() != nil),
zap.Bool("with bearer", r.prm.common.BearerToken() != nil),
)}
}
func (r *request) isLocal() bool {
return r.prm.common.LocalOnly()
}
func (r *request) isRaw() bool {
return r.prm.raw
}
func (r *request) address() oid.Address {
return r.prm.addr
}
func (r *request) key() (*ecdsa.PrivateKey, error) {
if r.prm.signerKey != nil {
// the key has already been requested and
// cached in the previous operations
return r.prm.signerKey, nil
}
var sessionInfo *util.SessionInfo
if tok := r.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
return r.keyStore.GetKey(sessionInfo)
}
func (r *request) canAssemble() bool {
return !r.isRaw() && !r.headOnly()
}
func (r *request) splitInfo() *objectSDK.SplitInfo {
return r.infoSplit
}
func (r *request) containerID() cid.ID {
return r.address().Container()
}
func (r *request) ctxRange() *objectSDK.Range {
return r.prm.rng
}
func (r *request) headOnly() bool {
return r.prm.head
}
func (r *request) netmapEpoch() uint64 {
return r.prm.common.NetmapEpoch()
}
func (r *request) netmapLookupDepth() uint64 {
return r.prm.common.NetmapLookupDepth()
}
func (r *request) initEpoch() bool {
r.curProcEpoch = r.netmapEpoch()
if r.curProcEpoch > 0 {
return true
}
e, err := r.epochSource.Epoch()
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.CouldNotGetCurrentEpochNumber, zap.Error(err))
return false
case err == nil:
r.curProcEpoch = e
return true
}
}
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
return nil, false
case err == nil:
return t, true
}
}
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
return nil, false
}
return rs, true
}
func (r *request) writeCollectedHeader(ctx context.Context) bool {
if r.ctxRange() != nil {
return true
}
err := r.prm.objWriter.WriteHeader(
ctx,
r.collectedObject.CutPayload(),
)
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWriteHeader, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
}
return r.status == statusOK
}
func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if r.headOnly() {
return true
}
err := r.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWritePayloadChunk, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
}
return err == nil
}
func (r *request) writeCollectedObject(ctx context.Context) {
if ok := r.writeCollectedHeader(ctx); ok {
r.writeObjectPayload(ctx, r.collectedObject)
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (r request) isForwardingEnabled() bool {
return r.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (r *request) disableForwarding() {
r.prm.SetRequestForwarder(nil)
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last, ok := src.LastPart(); ok {
dst.SetLastPart(last)
}
if link, ok := src.Link(); ok {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}