package getsvc import ( "context" "crypto/ecdsa" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) type statusError struct { status int err error } type execCtx struct { svc *Service prm RangePrm statusError infoSplit *objectSDK.SplitInfo log *logger.Logger collectedObject *objectSDK.Object head bool curProcEpoch uint64 } type execOption func(*execCtx) const ( statusUndefined int = iota statusOK statusINHUMED statusVIRTUAL statusOutOfRange ) func headOnly() execOption { return func(c *execCtx) { c.head = true } } func withPayloadRange(r *objectSDK.Range) execOption { return func(c *execCtx) { c.prm.rng = r } } func (exec *execCtx) setLogger(l *logger.Logger) { req := "GET" if exec.headOnly() { req = "HEAD" } else if exec.ctxRange() != nil { req = "GET_RANGE" } exec.log = &logger.Logger{Logger: l.With( zap.String("request", req), zap.Stringer("address", exec.address()), zap.Bool("raw", exec.isRaw()), zap.Bool("local", exec.isLocal()), zap.Bool("with session", exec.prm.common.SessionToken() != nil), zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), )} } func (exec execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } func (exec execCtx) isRaw() bool { return exec.prm.raw } func (exec execCtx) address() oid.Address { return exec.prm.addr } func (exec execCtx) key() (*ecdsa.PrivateKey, error) { if exec.prm.signerKey != nil { // the key has already been requested and // cached in the previous operations return exec.prm.signerKey, nil } var sessionInfo *util.SessionInfo if tok := exec.prm.common.SessionToken(); tok != nil { sessionInfo = &util.SessionInfo{ ID: tok.ID(), Owner: tok.Issuer(), } } return exec.svc.keyStore.GetKey(sessionInfo) } func (exec *execCtx) canAssemble() bool { return !exec.isRaw() && !exec.headOnly() } func (exec *execCtx) splitInfo() *objectSDK.SplitInfo { return exec.infoSplit } func (exec *execCtx) containerID() cid.ID { return exec.address().Container() } func (exec *execCtx) ctxRange() *objectSDK.Range { return exec.prm.rng } func (exec *execCtx) headOnly() bool { return exec.head } func (exec *execCtx) netmapEpoch() uint64 { return exec.prm.common.NetmapEpoch() } func (exec *execCtx) netmapLookupDepth() uint64 { return exec.prm.common.NetmapLookupDepth() } func (exec *execCtx) initEpoch() bool { exec.curProcEpoch = exec.netmapEpoch() if exec.curProcEpoch > 0 { return true } e, err := exec.svc.currentEpochReceiver.currentEpoch() switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug("could not get current epoch number", zap.String("error", err.Error()), ) return false case err == nil: exec.curProcEpoch = e return true } } func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { obj := addr.Object() t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug("could not generate container traverser", zap.String("error", err.Error()), ) return nil, false case err == nil: return t, true } } func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { c, err := exec.svc.clientCache.get(info) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug("could not construct remote node client") case err == nil: return c, true } return nil, false } func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { if last, ok := src.LastPart(); ok { dst.SetLastPart(last) } if link, ok := src.Link(); ok { dst.SetLink(link) } if splitID := src.SplitID(); splitID != nil { dst.SetSplitID(splitID) } } func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool { if exec.ctxRange() != nil { return true } err := exec.prm.objWriter.WriteHeader( ctx, exec.collectedObject.CutPayload(), ) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug("could not write header", zap.String("error", err.Error()), ) case err == nil: exec.status = statusOK exec.err = nil } return exec.status == statusOK } func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool { if exec.headOnly() { return true } err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload()) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug("could not write payload chunk", zap.String("error", err.Error()), ) case err == nil: exec.status = statusOK exec.err = nil } return err == nil } func (exec *execCtx) writeCollectedObject(ctx context.Context) { if ok := exec.writeCollectedHeader(ctx); ok { exec.writeObjectPayload(ctx, exec.collectedObject) } } // isForwardingEnabled returns true if common execution // parameters has request forwarding closure set. func (exec execCtx) isForwardingEnabled() bool { return exec.prm.forwarder != nil } // disableForwarding removes request forwarding closure from common // parameters, so it won't be inherited in new execution contexts. func (exec *execCtx) disableForwarding() { exec.prm.SetRequestForwarder(nil) }