frostfs-node/pkg/services/object/get/exec.go
Pavel Karpy 6a4e5e6f0a [] node: Try node's private key if dynamic token fetching failed
`GETRANGEHASH` request spawns `GETRANGE` requests if an object could not be
found locally. If the original request contains session, it can be static
and, therefore, fetching session key can not be performed successfully.
As the best effort a node could request object's range with its own key.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2022-12-30 11:07:35 +03:00

364 lines
7.5 KiB
Go

package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
"github.com/TrueCloudLab/frostfs-node/pkg/core/object"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type statusError struct {
status int
err error
}
type execCtx struct {
svc *Service
ctx context.Context
prm RangePrm
statusError
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
curOff uint64
head bool
curProcEpoch uint64
}
type execOption func(*execCtx)
const (
statusUndefined int = iota
statusOK
statusINHUMED
statusVIRTUAL
statusOutOfRange
)
func headOnly() execOption {
return func(c *execCtx) {
c.head = true
}
}
func withPayloadRange(r *objectSDK.Range) execOption {
return func(c *execCtx) {
c.prm.rng = r
}
}
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.headOnly() {
req = "HEAD"
} else if exec.ctxRange() != nil {
req = "GET_RANGE"
}
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
// isChild checks if reading object is a parent of the given object.
// Object without reference to the parent (only children with the parent header
// have it) is automatically considered as child: this should be guaranteed by
// upper level logic.
func (exec execCtx) isChild(obj *objectSDK.Object) bool {
par := obj.Parent()
return par == nil || equalAddresses(exec.address(), object.AddressOf(par))
}
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
if exec.prm.signerKey != nil {
// the key has already been requested and
// cached in the previous operations
return exec.prm.signerKey, nil
}
var sessionInfo *util.SessionInfo
if tok := exec.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
return exec.svc.keyStore.GetKey(sessionInfo)
}
func (exec *execCtx) canAssemble() bool {
return exec.svc.assembly && !exec.isRaw() && !exec.headOnly()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() cid.ID {
return exec.address().Container()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.curProcEpoch = e
return true
}
}
func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not generate container traverser",
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
w := NewSimpleObjectWriter()
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.SetRange(rng)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
child := w.Object()
ok := exec.status == statusOK
if ok && withHdr && !exec.isChild(child) {
exec.status = statusUndefined
exec.err = errors.New("wrong child header")
exec.log.Debug("parent address in child object differs")
}
return child, ok
}
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
prm := HeadPrm{
commonPrm: p.commonPrm,
}
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
err := exec.svc.Head(exec.context(), prm)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get child object header",
zap.Stringer("child ID", id),
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
child := w.Object()
if !exec.isChild(child) {
exec.status = statusUndefined
exec.log.Debug("parent address in child object differs")
} else {
exec.status = statusOK
exec.err = nil
}
return child, true
}
}
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
c, err := exec.svc.clientCache.get(info)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not construct remote node client")
case err == nil:
return c, true
}
return nil, false
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last, ok := src.LastPart(); ok {
dst.SetLastPart(last)
}
if link, ok := src.Link(); ok {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}
func (exec *execCtx) writeCollectedHeader() bool {
if exec.ctxRange() != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
exec.collectedObject.CutPayload(),
)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not write header",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return exec.status == statusOK
}
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
if exec.headOnly() {
return true
}
err := exec.prm.objWriter.WriteChunk(obj.Payload())
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not write payload chunk",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return err == nil
}
func (exec *execCtx) writeCollectedObject() {
if ok := exec.writeCollectedHeader(); ok {
exec.writeObjectPayload(exec.collectedObject)
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {
exec.prm.SetRequestForwarder(nil)
}