2020-12-02 23:45:25 +00:00
|
|
|
package getsvc
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"crypto/ecdsa"
|
|
|
|
|
2023-03-07 13:38:26 +00:00
|
|
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
2020-12-02 23:45:25 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
type statusError struct {
|
|
|
|
status int
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
type execCtx struct {
|
|
|
|
svc *Service
|
|
|
|
|
|
|
|
ctx context.Context
|
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
prm RangePrm
|
2020-12-02 23:45:25 +00:00
|
|
|
|
|
|
|
statusError
|
|
|
|
|
|
|
|
infoSplit *objectSDK.SplitInfo
|
|
|
|
|
|
|
|
log *logger.Logger
|
|
|
|
|
2022-03-03 14:19:05 +00:00
|
|
|
collectedObject *objectSDK.Object
|
2020-12-07 17:49:47 +00:00
|
|
|
|
2021-05-20 08:04:20 +00:00
|
|
|
head bool
|
2021-01-12 14:55:02 +00:00
|
|
|
|
|
|
|
curProcEpoch uint64
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
2020-12-09 10:32:33 +00:00
|
|
|
type execOption func(*execCtx)
|
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
const (
|
|
|
|
statusUndefined int = iota
|
|
|
|
statusOK
|
|
|
|
statusINHUMED
|
|
|
|
statusVIRTUAL
|
2020-12-07 17:49:47 +00:00
|
|
|
statusOutOfRange
|
2020-12-02 23:45:25 +00:00
|
|
|
)
|
|
|
|
|
2020-12-09 10:32:33 +00:00
|
|
|
func headOnly() execOption {
|
|
|
|
return func(c *execCtx) {
|
|
|
|
c.head = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func withPayloadRange(r *objectSDK.Range) execOption {
|
|
|
|
return func(c *execCtx) {
|
|
|
|
c.prm.rng = r
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
func (exec *execCtx) setLogger(l *logger.Logger) {
|
2020-12-07 17:49:47 +00:00
|
|
|
req := "GET"
|
2020-12-09 10:32:33 +00:00
|
|
|
if exec.headOnly() {
|
|
|
|
req = "HEAD"
|
|
|
|
} else if exec.ctxRange() != nil {
|
2020-12-07 17:49:47 +00:00
|
|
|
req = "GET_RANGE"
|
|
|
|
}
|
|
|
|
|
2022-09-28 07:41:01 +00:00
|
|
|
exec.log = &logger.Logger{Logger: l.With(
|
2020-12-07 17:49:47 +00:00
|
|
|
zap.String("request", req),
|
2020-12-02 23:45:25 +00:00
|
|
|
zap.Stringer("address", exec.address()),
|
|
|
|
zap.Bool("raw", exec.isRaw()),
|
|
|
|
zap.Bool("local", exec.isLocal()),
|
|
|
|
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
|
|
|
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
2022-09-28 07:41:01 +00:00
|
|
|
)}
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (exec execCtx) context() context.Context {
|
|
|
|
return exec.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (exec execCtx) isLocal() bool {
|
|
|
|
return exec.prm.common.LocalOnly()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (exec execCtx) isRaw() bool {
|
2021-11-01 08:35:33 +00:00
|
|
|
return exec.prm.raw
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
2022-05-31 17:00:41 +00:00
|
|
|
func (exec execCtx) address() oid.Address {
|
2021-11-01 08:35:33 +00:00
|
|
|
return exec.prm.addr
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
2021-10-26 12:07:28 +00:00
|
|
|
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
2022-12-21 16:32:08 +00:00
|
|
|
if exec.prm.signerKey != nil {
|
|
|
|
// the key has already been requested and
|
|
|
|
// cached in the previous operations
|
|
|
|
return exec.prm.signerKey, nil
|
|
|
|
}
|
|
|
|
|
2022-05-18 15:20:08 +00:00
|
|
|
var sessionInfo *util.SessionInfo
|
|
|
|
|
|
|
|
if tok := exec.prm.common.SessionToken(); tok != nil {
|
|
|
|
sessionInfo = &util.SessionInfo{
|
|
|
|
ID: tok.ID(),
|
2022-05-25 16:09:12 +00:00
|
|
|
Owner: tok.Issuer(),
|
2022-05-18 15:20:08 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return exec.svc.keyStore.GetKey(sessionInfo)
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (exec *execCtx) canAssemble() bool {
|
2023-03-13 21:19:19 +00:00
|
|
|
return !exec.isRaw() && !exec.headOnly()
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
|
|
|
|
return exec.infoSplit
|
|
|
|
}
|
|
|
|
|
2022-05-31 17:00:41 +00:00
|
|
|
func (exec *execCtx) containerID() cid.ID {
|
|
|
|
return exec.address().Container()
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
func (exec *execCtx) ctxRange() *objectSDK.Range {
|
|
|
|
return exec.prm.rng
|
|
|
|
}
|
|
|
|
|
2020-12-09 10:32:33 +00:00
|
|
|
func (exec *execCtx) headOnly() bool {
|
|
|
|
return exec.head
|
|
|
|
}
|
|
|
|
|
2021-01-12 14:55:02 +00:00
|
|
|
func (exec *execCtx) netmapEpoch() uint64 {
|
|
|
|
return exec.prm.common.NetmapEpoch()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (exec *execCtx) netmapLookupDepth() uint64 {
|
|
|
|
return exec.prm.common.NetmapLookupDepth()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (exec *execCtx) initEpoch() bool {
|
|
|
|
exec.curProcEpoch = exec.netmapEpoch()
|
|
|
|
if exec.curProcEpoch > 0 {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
|
|
|
|
|
|
|
switch {
|
|
|
|
default:
|
|
|
|
exec.status = statusUndefined
|
|
|
|
exec.err = err
|
|
|
|
|
|
|
|
exec.log.Debug("could not get current epoch number",
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
)
|
|
|
|
|
|
|
|
return false
|
|
|
|
case err == nil:
|
|
|
|
exec.curProcEpoch = e
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-31 17:00:41 +00:00
|
|
|
func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
|
|
|
obj := addr.Object()
|
|
|
|
|
|
|
|
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
|
2020-12-02 23:45:25 +00:00
|
|
|
|
|
|
|
switch {
|
|
|
|
default:
|
|
|
|
exec.status = statusUndefined
|
|
|
|
exec.err = err
|
|
|
|
|
|
|
|
exec.log.Debug("could not generate container traverser",
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
)
|
|
|
|
|
|
|
|
return nil, false
|
|
|
|
case err == nil:
|
|
|
|
return t, true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-28 04:46:10 +00:00
|
|
|
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
|
|
|
|
c, err := exec.svc.clientCache.get(info)
|
2021-05-20 15:17:16 +00:00
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
switch {
|
|
|
|
default:
|
|
|
|
exec.status = statusUndefined
|
|
|
|
exec.err = err
|
|
|
|
|
2021-06-22 12:08:17 +00:00
|
|
|
exec.log.Debug("could not construct remote node client")
|
2020-12-02 23:45:25 +00:00
|
|
|
case err == nil:
|
2021-05-20 15:17:16 +00:00
|
|
|
return c, true
|
2020-12-02 23:45:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
|
|
|
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
2022-05-12 16:37:46 +00:00
|
|
|
if last, ok := src.LastPart(); ok {
|
2020-12-02 23:45:25 +00:00
|
|
|
dst.SetLastPart(last)
|
|
|
|
}
|
|
|
|
|
2022-05-12 16:37:46 +00:00
|
|
|
if link, ok := src.Link(); ok {
|
2020-12-02 23:45:25 +00:00
|
|
|
dst.SetLink(link)
|
|
|
|
}
|
|
|
|
|
|
|
|
if splitID := src.SplitID(); splitID != nil {
|
|
|
|
dst.SetSplitID(splitID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (exec *execCtx) writeCollectedHeader() bool {
|
2020-12-07 17:49:47 +00:00
|
|
|
if exec.ctxRange() != nil {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
err := exec.prm.objWriter.WriteHeader(
|
2023-03-09 08:02:27 +00:00
|
|
|
exec.context(),
|
2022-03-03 14:19:05 +00:00
|
|
|
exec.collectedObject.CutPayload(),
|
2020-12-02 23:45:25 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
switch {
|
|
|
|
default:
|
|
|
|
exec.status = statusUndefined
|
|
|
|
exec.err = err
|
|
|
|
|
|
|
|
exec.log.Debug("could not write header",
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
)
|
|
|
|
case err == nil:
|
|
|
|
exec.status = statusOK
|
|
|
|
exec.err = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return exec.status == statusOK
|
|
|
|
}
|
|
|
|
|
2022-03-03 14:19:05 +00:00
|
|
|
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
|
2020-12-09 10:32:33 +00:00
|
|
|
if exec.headOnly() {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2023-03-09 08:02:27 +00:00
|
|
|
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
|
2020-12-02 23:45:25 +00:00
|
|
|
|
|
|
|
switch {
|
|
|
|
default:
|
|
|
|
exec.status = statusUndefined
|
|
|
|
exec.err = err
|
|
|
|
|
|
|
|
exec.log.Debug("could not write payload chunk",
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
)
|
|
|
|
case err == nil:
|
|
|
|
exec.status = statusOK
|
|
|
|
exec.err = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return err == nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (exec *execCtx) writeCollectedObject() {
|
|
|
|
if ok := exec.writeCollectedHeader(); ok {
|
|
|
|
exec.writeObjectPayload(exec.collectedObject)
|
|
|
|
}
|
|
|
|
}
|
2021-09-27 06:52:03 +00:00
|
|
|
|
|
|
|
// isForwardingEnabled returns true if common execution
|
|
|
|
// parameters has request forwarding closure set.
|
|
|
|
func (exec execCtx) isForwardingEnabled() bool {
|
|
|
|
return exec.prm.forwarder != nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// disableForwarding removes request forwarding closure from common
|
|
|
|
// parameters, so it won't be inherited in new execution contexts.
|
|
|
|
func (exec *execCtx) disableForwarding() {
|
|
|
|
exec.prm.SetRequestForwarder(nil)
|
|
|
|
}
|