Refactor getsvc #277

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:object-3606 into master 2023-04-28 14:03:13 +00:00
15 changed files with 442 additions and 445 deletions
Showing only changes of commit 183b0a09af - Show all commits

View file

@ -11,9 +11,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) assemble(ctx context.Context) { func (r *request) assemble(ctx context.Context) {
if !exec.canAssemble() { if !r.canAssemble() {
exec.log.Debug(logs.GetCanNotAssembleTheObject) r.log.Debug(logs.GetCanNotAssembleTheObject)
return return
} }
@ -28,35 +28,35 @@ func (exec *execCtx) assemble(ctx context.Context) {
// - the assembly process is expected to be handled on a container node // - the assembly process is expected to be handled on a container node
// only since the requests forwarding mechanism presentation; such the // only since the requests forwarding mechanism presentation; such the
// node should have enough rights for getting any child object by design. // node should have enough rights for getting any child object by design.
exec.prm.common.ForgetTokens() r.prm.common.ForgetTokens()
// Do not use forwarding during assembly stage. // Do not use forwarding during assembly stage.
// Request forwarding closure inherited in produced // Request forwarding closure inherited in produced
// `execCtx` so it should be disabled there. // `execCtx` so it should be disabled there.
exec.disableForwarding() r.disableForwarding()
exec.log.Debug(logs.GetTryingToAssembleTheObject) r.log.Debug(logs.GetTryingToAssembleTheObject)
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec) assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
exec.log.Debug(logs.GetAssemblingSplittedObject, r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Stringer("address", exec.address()), zap.Stringer("address", r.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
defer exec.log.Debug(logs.GetAssemblingSplittedObjectCompleted, defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
zap.Stringer("address", exec.address()), zap.Stringer("address", r.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
obj, err := assembler.Assemble(ctx, exec.prm.objWriter) obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil { if err != nil {
exec.log.Warn(logs.GetFailedToAssembleSplittedObject, r.log.Warn(logs.GetFailedToAssembleSplittedObject,
zap.Error(err), zap.Error(err),
zap.Stringer("address", exec.address()), zap.Stringer("address", r.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
} }
@ -68,27 +68,27 @@ func (exec *execCtx) assemble(ctx context.Context) {
switch { switch {
default: default:
exec.status = statusUndefined r.status = statusUndefined
exec.err = err r.err = err
case err == nil: case err == nil:
exec.status = statusOK r.status = statusOK
exec.err = nil r.err = nil
exec.collectedObject = obj r.collectedObject = obj
case errors.As(err, &errRemovedRemote): case errors.As(err, &errRemovedRemote):
exec.status = statusINHUMED r.status = statusINHUMED
exec.err = errRemovedRemote r.err = errRemovedRemote
case errors.As(err, &errRemovedLocal): case errors.As(err, &errRemovedLocal):
exec.status = statusINHUMED r.status = statusINHUMED
exec.err = errRemovedLocal r.err = errRemovedLocal
case errors.As(err, &errSplitInfo): case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL r.status = statusVIRTUAL
exec.err = errSplitInfo r.err = errSplitInfo
case errors.As(err, &errOutOfRangeRemote): case errors.As(err, &errOutOfRangeRemote):
exec.status = statusOutOfRange r.status = statusOutOfRange
exec.err = errOutOfRangeRemote r.err = errOutOfRangeRemote
case errors.As(err, &errOutOfRangeLocal): case errors.As(err, &errOutOfRangeLocal):
exec.status = statusOutOfRange r.status = statusOutOfRange
exec.err = errOutOfRangeLocal r.err = errOutOfRangeLocal
} }
} }
@ -96,53 +96,51 @@ func equalAddresses(a, b oid.Address) bool {
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object()) return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
} }
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) { func (r *request) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()
p := RequestParameters{} p := RequestParameters{}
p.common = p.common.WithLocalOnly(false) p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.containerID()) p.addr.SetContainer(r.containerID())
p.addr.SetObject(id) p.addr.SetObject(id)
p.head = true p.head = true
p.SetHeaderWriter(w) p.SetHeaderWriter(w)
err := exec.getDetached(ctx, p) if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {

what does it mean? 1m was not enough for me to understand "detached" here

what does it mean? 1m was not enough for me to understand "detached" here

renamed

renamed
if err != nil {
return nil, err return nil, err
} }
return w.Object(), nil return w.Object(), nil
} }
func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) { func (r *request) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()
p := exec.prm p := r.prm
p.common = p.common.WithLocalOnly(false) p.common = p.common.WithLocalOnly(false)
p.objWriter = w p.objWriter = w
p.rng = rng p.rng = rng
p.addr.SetContainer(exec.containerID()) p.addr.SetContainer(r.containerID())
p.addr.SetObject(id) p.addr.SetObject(id)
if err := exec.getDetached(ctx, p); err != nil { if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
return nil, err return nil, err
} }
return w.Object(), nil return w.Object(), nil
} }
func (exec *execCtx) getDetached(ctx context.Context, prm RequestParameters) error { func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm RequestParameters) error {
detachedExecutor := &execCtx{ detachedExecutor := &request{
keyStore: exec.keyStore, keyStore: r.keyStore,
traverserGenerator: exec.traverserGenerator, traverserGenerator: r.traverserGenerator,
remoteStorageConstructor: exec.remoteStorageConstructor, remoteStorageConstructor: r.remoteStorageConstructor,
epochSource: exec.epochSource, epochSource: r.epochSource,
localStorage: exec.localStorage, localStorage: r.localStorage,
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
log: exec.log, log: r.log,
} }
detachedExecutor.execute(ctx) detachedExecutor.execute(ctx)

View file

@ -2,7 +2,6 @@ package getsvc
import ( import (
"context" "context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -15,10 +14,6 @@ type objectGetter interface {
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
} }
var (
errParentAddressDiffers = errors.New("parent address in child object differs")
)
type assembler struct { type assembler struct {
addr oid.Address addr oid.Address
splitInfo *objectSDK.SplitInfo splitInfo *objectSDK.SplitInfo
@ -89,7 +84,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID)
parentObject := sourceObject.Parent() parentObject := sourceObject.Parent()
if parentObject == nil { if parentObject == nil {
return nil, nil, errors.New("received child with empty parent") return nil, nil, errChildWithEmptyParent
} }
a.parentObject = parentObject a.parentObject = parentObject

View file

@ -8,26 +8,26 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeOnContainer(ctx context.Context) { func (r *request) executeOnContainer(ctx context.Context) {
if exec.isLocal() { if r.isLocal() {
exec.log.Debug(logs.GetReturnResultDirectly) r.log.Debug(logs.GetReturnResultDirectly)
return return
} }
lookupDepth := exec.netmapLookupDepth() lookupDepth := r.netmapLookupDepth()
exec.log.Debug(logs.TryingToExecuteInContainer, r.log.Debug(logs.TryingToExecuteInContainer,
zap.Uint64("netmap lookup depth", lookupDepth), zap.Uint64("netmap lookup depth", lookupDepth),
) )
// initialize epoch number // initialize epoch number
ok := exec.initEpoch() ok := r.initEpoch()
if !ok { if !ok {
return return
} }
for { for {
if exec.processCurrentEpoch(ctx) { if r.processCurrentEpoch(ctx) {
break break
} }
@ -39,16 +39,16 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
lookupDepth-- lookupDepth--
// go to the previous epoch // go to the previous epoch
exec.curProcEpoch-- r.curProcEpoch--
} }
} }
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { func (r *request) processCurrentEpoch(ctx context.Context) bool {
exec.log.Debug(logs.ProcessEpoch, r.log.Debug(logs.ProcessEpoch,
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", r.curProcEpoch),
) )
traverser, ok := exec.generateTraverser(exec.address()) traverser, ok := r.generateTraverser(r.address())
if !ok { if !ok {
return true return true
} }
@ -56,12 +56,12 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
exec.status = statusUndefined r.status = statusUndefined
for { for {
addrs := traverser.Next() addrs := traverser.Next()
if len(addrs) == 0 { if len(addrs) == 0 {
exec.log.Debug(logs.NoMoreNodesAbortPlacementIteration) r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
return false return false
} }
@ -69,7 +69,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
for i := range addrs { for i := range addrs {
select { select {
case <-ctx.Done(): case <-ctx.Done():
exec.log.Debug(logs.InterruptPlacementIterationByContext, r.log.Debug(logs.InterruptPlacementIterationByContext,
zap.String("error", ctx.Err().Error()), zap.String("error", ctx.Err().Error()),
) )
@ -84,8 +84,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
client.NodeInfoFromNetmapElement(&info, addrs[i]) client.NodeInfoFromNetmapElement(&info, addrs[i])
if exec.processNode(ctx, info) { if r.processNode(ctx, info) {
exec.log.Debug(logs.GetCompletingTheOperation) r.log.Debug(logs.GetCompletingTheOperation)
return true return true
} }
} }

View file

@ -0,0 +1,10 @@
package getsvc
import "errors"
var (
errRangeZeroLength = errors.New("zero range length")
errRangeOverflow = errors.New("range overflow")
errChildWithEmptyParent = errors.New("received child with empty parent")
errParentAddressDiffers = errors.New("parent address in child object differs")
)

View file

@ -1,271 +0,0 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type statusError struct {
status int
err error
}
type RequestParameters struct {
commonPrm
head bool
rng *objectSDK.Range
}
type execCtx struct {
prm RequestParameters
statusError
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
curProcEpoch uint64
keyStore keyStorage
epochSource epochSource
traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor
localStorage localStorage
}
const (
statusUndefined int = iota
statusOK
statusINHUMED
statusVIRTUAL
statusOutOfRange
)
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.headOnly() {
req = "HEAD"
} else if exec.ctxRange() != nil {
req = "GET_RANGE"
}
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
if exec.prm.signerKey != nil {
// the key has already been requested and
// cached in the previous operations
return exec.prm.signerKey, nil
}
var sessionInfo *util.SessionInfo
if tok := exec.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
return exec.keyStore.GetKey(sessionInfo)
}
func (exec *execCtx) canAssemble() bool {
return !exec.isRaw() && !exec.headOnly()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() cid.ID {
return exec.address().Container()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.prm.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.epochSource.Epoch()
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.curProcEpoch = e
return true
}
}
func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, err := exec.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (exec execCtx) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
rs, err := exec.remoteStorageConstructor.Get(info)
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
return nil, false
}
return rs, true
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last, ok := src.LastPart(); ok {
dst.SetLastPart(last)
}
if link, ok := src.Link(); ok {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}
func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
if exec.ctxRange() != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
ctx,
exec.collectedObject.CutPayload(),
)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotWriteHeader,
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return exec.status == statusOK
}
func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if exec.headOnly() {
return true
}
err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotWritePayloadChunk,
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return err == nil
}
func (exec *execCtx) writeCollectedObject(ctx context.Context) {
if ok := exec.writeCollectedHeader(ctx); ok {
exec.writeObjectPayload(ctx, exec.collectedObject)
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {
exec.prm.SetRequestForwarder(nil)
}

View file

@ -66,7 +66,7 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
} }
func (s *Service) get(ctx context.Context, prm RequestParameters) error { func (s *Service) get(ctx context.Context, prm RequestParameters) error {
exec := &execCtx{ exec := &request{
keyStore: s.keyStore, keyStore: s.keyStore,
traverserGenerator: s.traverserGenerator, traverserGenerator: s.traverserGenerator,
remoteStorageConstructor: s.remoteStorageConstructor, remoteStorageConstructor: s.remoteStorageConstructor,
@ -84,7 +84,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
return exec.statusError.err return exec.statusError.err
} }
func (exec *execCtx) execute(ctx context.Context) { func (exec *request) execute(ctx context.Context) {
exec.log.Debug(logs.ServingRequest) exec.log.Debug(logs.ServingRequest)
// perform local operation // perform local operation
@ -93,7 +93,7 @@ func (exec *execCtx) execute(ctx context.Context) {
exec.analyzeStatus(ctx, true) exec.analyzeStatus(ctx, true)
} }
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result // analyze local result
switch exec.status { switch exec.status {
case statusOK: case statusOK:

View file

@ -1206,7 +1206,7 @@ func TestGetRemoteSmall(t *testing.T) {
err := svc.Get(ctx, p) err := svc.Get(ctx, p)
require.Error(t, err) require.Error(t, err)
require.Equal(t, err.Error(), "received child with empty parent") require.ErrorIs(t, err, errChildWithEmptyParent)
w = NewSimpleObjectWriter() w = NewSimpleObjectWriter()
payloadSz := srcObj.PayloadSize() payloadSz := srcObj.PayloadSize()
@ -1219,7 +1219,7 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm) err = svc.GetRange(ctx, rngPrm)
require.Error(t, err) require.Error(t, err)
require.Equal(t, err.Error(), "received child with empty parent") require.ErrorIs(t, err, errChildWithEmptyParent)
}) })
t.Run("out of range", func(t *testing.T) { t.Run("out of range", func(t *testing.T) {

View file

@ -11,7 +11,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeLocal(ctx context.Context) { func (r *request) executeLocal(ctx context.Context) {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal") ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
defer func() { defer func() {
span.End() span.End()
@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
var err error var err error
exec.collectedObject, err = exec.get(ctx) r.collectedObject, err = r.get(ctx)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errRemoved apistatus.ObjectAlreadyRemoved var errRemoved apistatus.ObjectAlreadyRemoved
@ -27,35 +27,35 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
switch { switch {
default: default:
exec.status = statusUndefined r.status = statusUndefined
exec.err = err r.err = err
exec.log.Debug(logs.GetLocalGetFailed, r.log.Debug(logs.GetLocalGetFailed,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
case err == nil: case err == nil:
exec.status = statusOK r.status = statusOK
exec.err = nil r.err = nil
exec.writeCollectedObject(ctx) r.writeCollectedObject(ctx)
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
exec.status = statusINHUMED r.status = statusINHUMED
exec.err = errRemoved r.err = errRemoved
case errors.As(err, &errSplitInfo): case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL r.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange r.status = statusOutOfRange
exec.err = errOutOfRange r.err = errOutOfRange
} }
} }
func (exec *execCtx) get(ctx context.Context) (*objectSDK.Object, error) { func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
if exec.headOnly() { if r.headOnly() {
return exec.localStorage.Head(ctx, exec.address(), exec.isRaw()) return r.localStorage.Head(ctx, r.address(), r.isRaw())
} }
if rng := exec.ctxRange(); rng != nil { if rng := r.ctxRange(); rng != nil {
return exec.localStorage.Range(ctx, exec.address(), rng) return r.localStorage.Range(ctx, r.address(), rng)
} }
return exec.localStorage.Get(ctx, exec.address()) return r.localStorage.Get(ctx, r.address())
} }

View file

@ -3,12 +3,11 @@ package getsvc
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors"
"hash" "hash"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
@ -21,14 +20,9 @@ type Prm struct {
type RangePrm struct { type RangePrm struct {
commonPrm commonPrm
rng *object.Range rng *objectSDK.Range
} }
var (
errRangeZeroLength = errors.New("zero range length")
errRangeOverflow = errors.New("range overflow")
)
// Validate pre-validates `OBJECTRANGE` request's parameters content // Validate pre-validates `OBJECTRANGE` request's parameters content
// without access to the requested object's payload. // without access to the requested object's payload.
func (p RangePrm) Validate() error { func (p RangePrm) Validate() error {
@ -54,12 +48,18 @@ type RangeHashPrm struct {
hashGen func() hash.Hash hashGen func() hash.Hash
rngs []object.Range rngs []objectSDK.Range
salt []byte salt []byte
} }
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error) type RequestParameters struct {
commonPrm
head bool
rng *objectSDK.Range
}
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*objectSDK.Object, error)
// HeadPrm groups parameters of Head service call. // HeadPrm groups parameters of Head service call.
type HeadPrm struct { type HeadPrm struct {
@ -83,24 +83,6 @@ type commonPrm struct {
signerKey *ecdsa.PrivateKey signerKey *ecdsa.PrivateKey
} }
// ChunkWriter is an interface of target component
// to write payload chunk.
type ChunkWriter interface {
WriteChunk(context.Context, []byte) error
}
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(context.Context, *object.Object) error
}
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
HeaderWriter
ChunkWriter
}
// SetObjectWriter sets target component to write the object. // SetObjectWriter sets target component to write the object.
func (p *Prm) SetObjectWriter(w ObjectWriter) { func (p *Prm) SetObjectWriter(w ObjectWriter) {
p.objWriter = w p.objWriter = w
@ -114,12 +96,12 @@ func (p *commonPrm) SetChunkWriter(w ChunkWriter) {
} }
// SetRange sets range of the requested payload data. // SetRange sets range of the requested payload data.
func (p *RangePrm) SetRange(rng *object.Range) { func (p *RangePrm) SetRange(rng *objectSDK.Range) {
p.rng = rng p.rng = rng
} }
// SetRangeList sets a list of object payload ranges. // SetRangeList sets a list of object payload ranges.
func (p *RangeHashPrm) SetRangeList(rngs []object.Range) { func (p *RangeHashPrm) SetRangeList(rngs []objectSDK.Range) {
p.rngs = rngs p.rngs = rngs
} }

View file

@ -12,18 +12,18 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool { func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode") ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
defer span.End() defer span.End()
exec.log.Debug(logs.ProcessingNode) r.log.Debug(logs.ProcessingNode)
rs, ok := exec.getRemoteStorage(info) rs, ok := r.getRemoteStorage(info)
if !ok { if !ok {
return true return true
} }
obj, err := exec.getRemote(ctx, rs, info) obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
@ -33,68 +33,68 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
default: default:
var errNotFound apistatus.ObjectNotFound var errNotFound apistatus.ObjectNotFound
exec.status = statusUndefined r.status = statusUndefined
exec.err = errNotFound r.err = errNotFound
exec.log.Debug(logs.GetRemoteCallFailed, r.log.Debug(logs.GetRemoteCallFailed,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
case err == nil: case err == nil:
exec.status = statusOK r.status = statusOK
exec.err = nil r.err = nil
// both object and err are nil only if the original // both object and err are nil only if the original
// request was forwarded to another node and the object // request was forwarded to another node and the object
// has already been streamed to the requesting party // has already been streamed to the requesting party
if obj != nil { if obj != nil {
exec.collectedObject = obj r.collectedObject = obj
exec.writeCollectedObject(ctx) r.writeCollectedObject(ctx)
} }
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
exec.status = statusINHUMED r.status = statusINHUMED
exec.err = errRemoved r.err = errRemoved
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange r.status = statusOutOfRange
exec.err = errOutOfRange r.err = errOutOfRange
case errors.As(err, &errSplitInfo): case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL r.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
} }
return exec.status != statusUndefined return r.status != statusUndefined
} }
func (exec *execCtx) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) { func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
if exec.isForwardingEnabled() { if r.isForwardingEnabled() {
return rs.ForwardRequest(ctx, info, exec.prm.forwarder) return rs.ForwardRequest(ctx, info, r.prm.forwarder)
} }
key, err := exec.key() key, err := r.key()
if err != nil { if err != nil {
return nil, err return nil, err
} }
prm := RemoteRequestParams{ prm := RemoteRequestParams{
Epoch: exec.curProcEpoch, Epoch: r.curProcEpoch,
TTL: exec.prm.common.TTL(), TTL: r.prm.common.TTL(),
PrivateKey: key, PrivateKey: key,
SessionToken: exec.prm.common.SessionToken(), SessionToken: r.prm.common.SessionToken(),
BearerToken: exec.prm.common.BearerToken(), BearerToken: r.prm.common.BearerToken(),
XHeaders: exec.prm.common.XHeaders(), XHeaders: r.prm.common.XHeaders(),
IsRaw: exec.isRaw(), IsRaw: r.isRaw(),
} }
if exec.headOnly() { if r.headOnly() {
return rs.Head(ctx, exec.address(), prm) return rs.Head(ctx, r.address(), prm)
} }
// we don't specify payload writer because we accumulate // we don't specify payload writer because we accumulate
// the object locally (even huge). // the object locally (even huge).
if rng := exec.ctxRange(); rng != nil { if rng := r.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access, // Current spec allows other storage node to deny access,
// fallback to GET here. // fallback to GET here.
return rs.Range(ctx, exec.address(), rng, prm) return rs.Range(ctx, r.address(), rng, prm)
} }
return rs.Get(ctx, exec.address(), prm) return rs.Get(ctx, r.address(), prm)
} }

View file

@ -0,0 +1,252 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type request struct {
prm RequestParameters
statusError
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
curProcEpoch uint64
keyStore keyStorage
epochSource epochSource
traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor
localStorage localStorage
}
func (r *request) setLogger(l *logger.Logger) {
req := "GET"
if r.headOnly() {
req = "HEAD"
} else if r.ctxRange() != nil {
req = "GET_RANGE"
}
r.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", r.address()),
zap.Bool("raw", r.isRaw()),
zap.Bool("local", r.isLocal()),
zap.Bool("with session", r.prm.common.SessionToken() != nil),
zap.Bool("with bearer", r.prm.common.BearerToken() != nil),
)}
}
func (r *request) isLocal() bool {
return r.prm.common.LocalOnly()
}
func (r *request) isRaw() bool {
return r.prm.raw
}
func (r *request) address() oid.Address {
return r.prm.addr
}
func (r *request) key() (*ecdsa.PrivateKey, error) {
if r.prm.signerKey != nil {
// the key has already been requested and
// cached in the previous operations
return r.prm.signerKey, nil
}
var sessionInfo *util.SessionInfo
if tok := r.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
return r.keyStore.GetKey(sessionInfo)
}
func (r *request) canAssemble() bool {
return !r.isRaw() && !r.headOnly()
}
func (r *request) splitInfo() *objectSDK.SplitInfo {
return r.infoSplit
}
func (r *request) containerID() cid.ID {
return r.address().Container()
}
func (r *request) ctxRange() *objectSDK.Range {
return r.prm.rng
}
func (r *request) headOnly() bool {
return r.prm.head
}
func (r *request) netmapEpoch() uint64 {
return r.prm.common.NetmapEpoch()
}
func (r *request) netmapLookupDepth() uint64 {
return r.prm.common.NetmapLookupDepth()
}
func (r *request) initEpoch() bool {
r.curProcEpoch = r.netmapEpoch()
if r.curProcEpoch > 0 {
return true
}
e, err := r.epochSource.Epoch()
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.CouldNotGetCurrentEpochNumber,
zap.String("error", err.Error()),
)
return false
case err == nil:
r.curProcEpoch = e
return true
}
}
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
return nil, false
}
return rs, true
}
func (r *request) writeCollectedHeader(ctx context.Context) bool {
if r.ctxRange() != nil {
return true
}
err := r.prm.objWriter.WriteHeader(
ctx,
r.collectedObject.CutPayload(),
)
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWriteHeader,
zap.String("error", err.Error()),
)
case err == nil:
r.status = statusOK
r.err = nil
}
return r.status == statusOK
}
func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if r.headOnly() {
return true
}
err := r.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWritePayloadChunk,
zap.String("error", err.Error()),
)
case err == nil:
r.status = statusOK
r.err = nil
}
return err == nil
}
func (r *request) writeCollectedObject(ctx context.Context) {
if ok := r.writeCollectedHeader(ctx); ok {
r.writeObjectPayload(ctx, r.collectedObject)
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (r request) isForwardingEnabled() bool {
return r.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (r *request) disableForwarding() {
r.prm.SetRequestForwarder(nil)
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last, ok := src.LastPart(); ok {
dst.SetLastPart(last)
}
if link, ok := src.Link(); ok {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}

View file

@ -1,9 +0,0 @@
package getsvc
type RangeHashRes struct {
hashes [][]byte
}
func (r *RangeHashRes) Hashes() [][]byte {
return r.hashes
}

View file

@ -0,0 +1,14 @@
package getsvc
const (
statusUndefined int = iota
statusOK
statusINHUMED
statusVIRTUAL
statusOutOfRange
)
type statusError struct {
status int
err error
}

View file

@ -228,3 +228,11 @@ func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK
return obj return obj
} }
type RangeHashRes struct {
hashes [][]byte
}
func (r *RangeHashRes) Hashes() [][]byte {
return r.hashes
}

View file

@ -7,6 +7,24 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
) )
// ChunkWriter is an interface of target component
// to write payload chunk.
type ChunkWriter interface {
WriteChunk(context.Context, []byte) error
}
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(context.Context, *object.Object) error
}
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
HeaderWriter
ChunkWriter
}
type SimpleObjectWriter struct { type SimpleObjectWriter struct {
obj *object.Object obj *object.Object