forked from TrueCloudLab/frostfs-node
253 lines
5.2 KiB
Go
253 lines
5.2 KiB
Go
|
package getsvc
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/ecdsa"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
type request struct {
|
||
|
prm RequestParameters
|
||
|
|
||
|
statusError
|
||
|
|
||
|
infoSplit *objectSDK.SplitInfo
|
||
|
|
||
|
log *logger.Logger
|
||
|
|
||
|
collectedObject *objectSDK.Object
|
||
|
|
||
|
curProcEpoch uint64
|
||
|
|
||
|
keyStore keyStorage
|
||
|
epochSource epochSource
|
||
|
traverserGenerator traverserGenerator
|
||
|
remoteStorageConstructor remoteStorageConstructor
|
||
|
localStorage localStorage
|
||
|
}
|
||
|
|
||
|
func (r *request) setLogger(l *logger.Logger) {
|
||
|
req := "GET"
|
||
|
if r.headOnly() {
|
||
|
req = "HEAD"
|
||
|
} else if r.ctxRange() != nil {
|
||
|
req = "GET_RANGE"
|
||
|
}
|
||
|
|
||
|
r.log = &logger.Logger{Logger: l.With(
|
||
|
zap.String("request", req),
|
||
|
zap.Stringer("address", r.address()),
|
||
|
zap.Bool("raw", r.isRaw()),
|
||
|
zap.Bool("local", r.isLocal()),
|
||
|
zap.Bool("with session", r.prm.common.SessionToken() != nil),
|
||
|
zap.Bool("with bearer", r.prm.common.BearerToken() != nil),
|
||
|
)}
|
||
|
}
|
||
|
|
||
|
func (r *request) isLocal() bool {
|
||
|
return r.prm.common.LocalOnly()
|
||
|
}
|
||
|
|
||
|
func (r *request) isRaw() bool {
|
||
|
return r.prm.raw
|
||
|
}
|
||
|
|
||
|
func (r *request) address() oid.Address {
|
||
|
return r.prm.addr
|
||
|
}
|
||
|
|
||
|
func (r *request) key() (*ecdsa.PrivateKey, error) {
|
||
|
if r.prm.signerKey != nil {
|
||
|
// the key has already been requested and
|
||
|
// cached in the previous operations
|
||
|
return r.prm.signerKey, nil
|
||
|
}
|
||
|
|
||
|
var sessionInfo *util.SessionInfo
|
||
|
|
||
|
if tok := r.prm.common.SessionToken(); tok != nil {
|
||
|
sessionInfo = &util.SessionInfo{
|
||
|
ID: tok.ID(),
|
||
|
Owner: tok.Issuer(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return r.keyStore.GetKey(sessionInfo)
|
||
|
}
|
||
|
|
||
|
func (r *request) canAssemble() bool {
|
||
|
return !r.isRaw() && !r.headOnly()
|
||
|
}
|
||
|
|
||
|
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||
|
return r.infoSplit
|
||
|
}
|
||
|
|
||
|
func (r *request) containerID() cid.ID {
|
||
|
return r.address().Container()
|
||
|
}
|
||
|
|
||
|
func (r *request) ctxRange() *objectSDK.Range {
|
||
|
return r.prm.rng
|
||
|
}
|
||
|
|
||
|
func (r *request) headOnly() bool {
|
||
|
return r.prm.head
|
||
|
}
|
||
|
|
||
|
func (r *request) netmapEpoch() uint64 {
|
||
|
return r.prm.common.NetmapEpoch()
|
||
|
}
|
||
|
|
||
|
func (r *request) netmapLookupDepth() uint64 {
|
||
|
return r.prm.common.NetmapLookupDepth()
|
||
|
}
|
||
|
|
||
|
func (r *request) initEpoch() bool {
|
||
|
r.curProcEpoch = r.netmapEpoch()
|
||
|
if r.curProcEpoch > 0 {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
e, err := r.epochSource.Epoch()
|
||
|
|
||
|
switch {
|
||
|
default:
|
||
|
r.status = statusUndefined
|
||
|
r.err = err
|
||
|
|
||
|
r.log.Debug(logs.CouldNotGetCurrentEpochNumber,
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
|
||
|
return false
|
||
|
case err == nil:
|
||
|
r.curProcEpoch = e
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||
|
obj := addr.Object()
|
||
|
|
||
|
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||
|
|
||
|
switch {
|
||
|
default:
|
||
|
r.status = statusUndefined
|
||
|
r.err = err
|
||
|
|
||
|
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
|
||
|
return nil, false
|
||
|
case err == nil:
|
||
|
return t, true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||
|
rs, err := r.remoteStorageConstructor.Get(info)
|
||
|
if err != nil {
|
||
|
r.status = statusUndefined
|
||
|
r.err = err
|
||
|
|
||
|
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
||
|
|
||
|
return nil, false
|
||
|
}
|
||
|
|
||
|
return rs, true
|
||
|
}
|
||
|
|
||
|
func (r *request) writeCollectedHeader(ctx context.Context) bool {
|
||
|
if r.ctxRange() != nil {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
err := r.prm.objWriter.WriteHeader(
|
||
|
ctx,
|
||
|
r.collectedObject.CutPayload(),
|
||
|
)
|
||
|
|
||
|
switch {
|
||
|
default:
|
||
|
r.status = statusUndefined
|
||
|
r.err = err
|
||
|
|
||
|
r.log.Debug(logs.GetCouldNotWriteHeader,
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
case err == nil:
|
||
|
r.status = statusOK
|
||
|
r.err = nil
|
||
|
}
|
||
|
|
||
|
return r.status == statusOK
|
||
|
}
|
||
|
|
||
|
func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
|
||
|
if r.headOnly() {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
err := r.prm.objWriter.WriteChunk(ctx, obj.Payload())
|
||
|
|
||
|
switch {
|
||
|
default:
|
||
|
r.status = statusUndefined
|
||
|
r.err = err
|
||
|
|
||
|
r.log.Debug(logs.GetCouldNotWritePayloadChunk,
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
case err == nil:
|
||
|
r.status = statusOK
|
||
|
r.err = nil
|
||
|
}
|
||
|
|
||
|
return err == nil
|
||
|
}
|
||
|
|
||
|
func (r *request) writeCollectedObject(ctx context.Context) {
|
||
|
if ok := r.writeCollectedHeader(ctx); ok {
|
||
|
r.writeObjectPayload(ctx, r.collectedObject)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// isForwardingEnabled returns true if common execution
|
||
|
// parameters has request forwarding closure set.
|
||
|
func (r request) isForwardingEnabled() bool {
|
||
|
return r.prm.forwarder != nil
|
||
|
}
|
||
|
|
||
|
// disableForwarding removes request forwarding closure from common
|
||
|
// parameters, so it won't be inherited in new execution contexts.
|
||
|
func (r *request) disableForwarding() {
|
||
|
r.prm.SetRequestForwarder(nil)
|
||
|
}
|
||
|
|
||
|
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||
|
if last, ok := src.LastPart(); ok {
|
||
|
dst.SetLastPart(last)
|
||
|
}
|
||
|
|
||
|
if link, ok := src.Link(); ok {
|
||
|
dst.SetLink(link)
|
||
|
}
|
||
|
|
||
|
if splitID := src.SplitID(); splitID != nil {
|
||
|
dst.SetSplitID(splitID)
|
||
|
}
|
||
|
}
|