diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 0c323a6f3e..777822ac36 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -11,9 +11,9 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) assemble(ctx context.Context) { - if !exec.canAssemble() { - exec.log.Debug(logs.GetCanNotAssembleTheObject) +func (r *request) assemble(ctx context.Context) { + if !r.canAssemble() { + r.log.Debug(logs.GetCanNotAssembleTheObject) return } @@ -28,35 +28,35 @@ func (exec *execCtx) assemble(ctx context.Context) { // - the assembly process is expected to be handled on a container node // only since the requests forwarding mechanism presentation; such the // node should have enough rights for getting any child object by design. - exec.prm.common.ForgetTokens() + r.prm.common.ForgetTokens() // Do not use forwarding during assembly stage. // Request forwarding closure inherited in produced // `execCtx` so it should be disabled there. - exec.disableForwarding() + r.disableForwarding() - exec.log.Debug(logs.GetTryingToAssembleTheObject) + r.log.Debug(logs.GetTryingToAssembleTheObject) - assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec) + assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) - exec.log.Debug(logs.GetAssemblingSplittedObject, - zap.Stringer("address", exec.address()), - zap.Uint64("range_offset", exec.ctxRange().GetOffset()), - zap.Uint64("range_length", exec.ctxRange().GetLength()), + r.log.Debug(logs.GetAssemblingSplittedObject, + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), ) - defer exec.log.Debug(logs.GetAssemblingSplittedObjectCompleted, - zap.Stringer("address", exec.address()), - zap.Uint64("range_offset", exec.ctxRange().GetOffset()), - zap.Uint64("range_length", exec.ctxRange().GetLength()), + defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted, + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), ) - obj, err := assembler.Assemble(ctx, exec.prm.objWriter) + obj, err := assembler.Assemble(ctx, r.prm.objWriter) if err != nil { - exec.log.Warn(logs.GetFailedToAssembleSplittedObject, + r.log.Warn(logs.GetFailedToAssembleSplittedObject, zap.Error(err), - zap.Stringer("address", exec.address()), - zap.Uint64("range_offset", exec.ctxRange().GetOffset()), - zap.Uint64("range_length", exec.ctxRange().GetLength()), + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), ) } @@ -68,27 +68,27 @@ func (exec *execCtx) assemble(ctx context.Context) { switch { default: - exec.status = statusUndefined - exec.err = err + r.status = statusUndefined + r.err = err case err == nil: - exec.status = statusOK - exec.err = nil - exec.collectedObject = obj + r.status = statusOK + r.err = nil + r.collectedObject = obj case errors.As(err, &errRemovedRemote): - exec.status = statusINHUMED - exec.err = errRemovedRemote + r.status = statusINHUMED + r.err = errRemovedRemote case errors.As(err, &errRemovedLocal): - exec.status = statusINHUMED - exec.err = errRemovedLocal + r.status = statusINHUMED + r.err = errRemovedLocal case errors.As(err, &errSplitInfo): - exec.status = statusVIRTUAL - exec.err = errSplitInfo + r.status = statusVIRTUAL + r.err = errSplitInfo case errors.As(err, &errOutOfRangeRemote): - exec.status = statusOutOfRange - exec.err = errOutOfRangeRemote + r.status = statusOutOfRange + r.err = errOutOfRangeRemote case errors.As(err, &errOutOfRangeLocal): - exec.status = statusOutOfRange - exec.err = errOutOfRangeLocal + r.status = statusOutOfRange + r.err = errOutOfRangeLocal } } @@ -96,53 +96,51 @@ func equalAddresses(a, b oid.Address) bool { return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object()) } -func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) { +func (r *request) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) { w := NewSimpleObjectWriter() p := RequestParameters{} p.common = p.common.WithLocalOnly(false) - p.addr.SetContainer(exec.containerID()) + p.addr.SetContainer(r.containerID()) p.addr.SetObject(id) p.head = true p.SetHeaderWriter(w) - err := exec.getDetached(ctx, p) - - if err != nil { + if err := r.getObjectWithIndependentRequest(ctx, p); err != nil { return nil, err } return w.Object(), nil } -func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) { +func (r *request) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) { w := NewSimpleObjectWriter() - p := exec.prm + p := r.prm p.common = p.common.WithLocalOnly(false) p.objWriter = w p.rng = rng - p.addr.SetContainer(exec.containerID()) + p.addr.SetContainer(r.containerID()) p.addr.SetObject(id) - if err := exec.getDetached(ctx, p); err != nil { + if err := r.getObjectWithIndependentRequest(ctx, p); err != nil { return nil, err } return w.Object(), nil } -func (exec *execCtx) getDetached(ctx context.Context, prm RequestParameters) error { - detachedExecutor := &execCtx{ - keyStore: exec.keyStore, - traverserGenerator: exec.traverserGenerator, - remoteStorageConstructor: exec.remoteStorageConstructor, - epochSource: exec.epochSource, - localStorage: exec.localStorage, +func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm RequestParameters) error { + detachedExecutor := &request{ + keyStore: r.keyStore, + traverserGenerator: r.traverserGenerator, + remoteStorageConstructor: r.remoteStorageConstructor, + epochSource: r.epochSource, + localStorage: r.localStorage, prm: prm, infoSplit: objectSDK.NewSplitInfo(), - log: exec.log, + log: r.log, } detachedExecutor.execute(ctx) diff --git a/pkg/services/object/get/assembler.go b/pkg/services/object/get/assembler.go index 7a3023c74d..321d4b16d3 100644 --- a/pkg/services/object/get/assembler.go +++ b/pkg/services/object/get/assembler.go @@ -2,7 +2,6 @@ package getsvc import ( "context" - "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -15,10 +14,6 @@ type objectGetter interface { HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) } -var ( - errParentAddressDiffers = errors.New("parent address in child object differs") -) - type assembler struct { addr oid.Address splitInfo *objectSDK.SplitInfo @@ -89,7 +84,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) parentObject := sourceObject.Parent() if parentObject == nil { - return nil, nil, errors.New("received child with empty parent") + return nil, nil, errChildWithEmptyParent } a.parentObject = parentObject diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 17628e5779..689d1a16d1 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -8,26 +8,26 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeOnContainer(ctx context.Context) { - if exec.isLocal() { - exec.log.Debug(logs.GetReturnResultDirectly) +func (r *request) executeOnContainer(ctx context.Context) { + if r.isLocal() { + r.log.Debug(logs.GetReturnResultDirectly) return } - lookupDepth := exec.netmapLookupDepth() + lookupDepth := r.netmapLookupDepth() - exec.log.Debug(logs.TryingToExecuteInContainer, + r.log.Debug(logs.TryingToExecuteInContainer, zap.Uint64("netmap lookup depth", lookupDepth), ) // initialize epoch number - ok := exec.initEpoch() + ok := r.initEpoch() if !ok { return } for { - if exec.processCurrentEpoch(ctx) { + if r.processCurrentEpoch(ctx) { break } @@ -39,16 +39,16 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) { lookupDepth-- // go to the previous epoch - exec.curProcEpoch-- + r.curProcEpoch-- } } -func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { - exec.log.Debug(logs.ProcessEpoch, - zap.Uint64("number", exec.curProcEpoch), +func (r *request) processCurrentEpoch(ctx context.Context) bool { + r.log.Debug(logs.ProcessEpoch, + zap.Uint64("number", r.curProcEpoch), ) - traverser, ok := exec.generateTraverser(exec.address()) + traverser, ok := r.generateTraverser(r.address()) if !ok { return true } @@ -56,12 +56,12 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() - exec.status = statusUndefined + r.status = statusUndefined for { addrs := traverser.Next() if len(addrs) == 0 { - exec.log.Debug(logs.NoMoreNodesAbortPlacementIteration) + r.log.Debug(logs.NoMoreNodesAbortPlacementIteration) return false } @@ -69,7 +69,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { for i := range addrs { select { case <-ctx.Done(): - exec.log.Debug(logs.InterruptPlacementIterationByContext, + r.log.Debug(logs.InterruptPlacementIterationByContext, zap.String("error", ctx.Err().Error()), ) @@ -84,8 +84,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { client.NodeInfoFromNetmapElement(&info, addrs[i]) - if exec.processNode(ctx, info) { - exec.log.Debug(logs.GetCompletingTheOperation) + if r.processNode(ctx, info) { + r.log.Debug(logs.GetCompletingTheOperation) return true } } diff --git a/pkg/services/object/get/errors.go b/pkg/services/object/get/errors.go new file mode 100644 index 0000000000..6ea16a144f --- /dev/null +++ b/pkg/services/object/get/errors.go @@ -0,0 +1,10 @@ +package getsvc + +import "errors" + +var ( + errRangeZeroLength = errors.New("zero range length") + errRangeOverflow = errors.New("range overflow") + errChildWithEmptyParent = errors.New("received child with empty parent") + errParentAddressDiffers = errors.New("parent address in child object differs") +) diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go deleted file mode 100644 index 45bd601e16..0000000000 --- a/pkg/services/object/get/exec.go +++ /dev/null @@ -1,271 +0,0 @@ -package getsvc - -import ( - "context" - "crypto/ecdsa" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" -) - -type statusError struct { - status int - err error -} - -type RequestParameters struct { - commonPrm - head bool - rng *objectSDK.Range -} - -type execCtx struct { - prm RequestParameters - - statusError - - infoSplit *objectSDK.SplitInfo - - log *logger.Logger - - collectedObject *objectSDK.Object - - curProcEpoch uint64 - - keyStore keyStorage - epochSource epochSource - traverserGenerator traverserGenerator - remoteStorageConstructor remoteStorageConstructor - localStorage localStorage -} - -const ( - statusUndefined int = iota - statusOK - statusINHUMED - statusVIRTUAL - statusOutOfRange -) - -func (exec *execCtx) setLogger(l *logger.Logger) { - req := "GET" - if exec.headOnly() { - req = "HEAD" - } else if exec.ctxRange() != nil { - req = "GET_RANGE" - } - - exec.log = &logger.Logger{Logger: l.With( - zap.String("request", req), - zap.Stringer("address", exec.address()), - zap.Bool("raw", exec.isRaw()), - zap.Bool("local", exec.isLocal()), - zap.Bool("with session", exec.prm.common.SessionToken() != nil), - zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), - )} -} - -func (exec execCtx) isLocal() bool { - return exec.prm.common.LocalOnly() -} - -func (exec execCtx) isRaw() bool { - return exec.prm.raw -} - -func (exec execCtx) address() oid.Address { - return exec.prm.addr -} - -func (exec execCtx) key() (*ecdsa.PrivateKey, error) { - if exec.prm.signerKey != nil { - // the key has already been requested and - // cached in the previous operations - return exec.prm.signerKey, nil - } - - var sessionInfo *util.SessionInfo - - if tok := exec.prm.common.SessionToken(); tok != nil { - sessionInfo = &util.SessionInfo{ - ID: tok.ID(), - Owner: tok.Issuer(), - } - } - - return exec.keyStore.GetKey(sessionInfo) -} - -func (exec *execCtx) canAssemble() bool { - return !exec.isRaw() && !exec.headOnly() -} - -func (exec *execCtx) splitInfo() *objectSDK.SplitInfo { - return exec.infoSplit -} - -func (exec *execCtx) containerID() cid.ID { - return exec.address().Container() -} - -func (exec *execCtx) ctxRange() *objectSDK.Range { - return exec.prm.rng -} - -func (exec *execCtx) headOnly() bool { - return exec.prm.head -} - -func (exec *execCtx) netmapEpoch() uint64 { - return exec.prm.common.NetmapEpoch() -} - -func (exec *execCtx) netmapLookupDepth() uint64 { - return exec.prm.common.NetmapLookupDepth() -} - -func (exec *execCtx) initEpoch() bool { - exec.curProcEpoch = exec.netmapEpoch() - if exec.curProcEpoch > 0 { - return true - } - - e, err := exec.epochSource.Epoch() - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug(logs.CouldNotGetCurrentEpochNumber, - zap.String("error", err.Error()), - ) - - return false - case err == nil: - exec.curProcEpoch = e - return true - } -} - -func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { - obj := addr.Object() - - t, err := exec.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug(logs.GetCouldNotGenerateContainerTraverser, - zap.String("error", err.Error()), - ) - - return nil, false - case err == nil: - return t, true - } -} - -func (exec execCtx) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) { - rs, err := exec.remoteStorageConstructor.Get(info) - if err != nil { - exec.status = statusUndefined - exec.err = err - - exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient) - - return nil, false - } - - return rs, true -} - -func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { - if last, ok := src.LastPart(); ok { - dst.SetLastPart(last) - } - - if link, ok := src.Link(); ok { - dst.SetLink(link) - } - - if splitID := src.SplitID(); splitID != nil { - dst.SetSplitID(splitID) - } -} - -func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool { - if exec.ctxRange() != nil { - return true - } - - err := exec.prm.objWriter.WriteHeader( - ctx, - exec.collectedObject.CutPayload(), - ) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug(logs.GetCouldNotWriteHeader, - zap.String("error", err.Error()), - ) - case err == nil: - exec.status = statusOK - exec.err = nil - } - - return exec.status == statusOK -} - -func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool { - if exec.headOnly() { - return true - } - - err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload()) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug(logs.GetCouldNotWritePayloadChunk, - zap.String("error", err.Error()), - ) - case err == nil: - exec.status = statusOK - exec.err = nil - } - - return err == nil -} - -func (exec *execCtx) writeCollectedObject(ctx context.Context) { - if ok := exec.writeCollectedHeader(ctx); ok { - exec.writeObjectPayload(ctx, exec.collectedObject) - } -} - -// isForwardingEnabled returns true if common execution -// parameters has request forwarding closure set. -func (exec execCtx) isForwardingEnabled() bool { - return exec.prm.forwarder != nil -} - -// disableForwarding removes request forwarding closure from common -// parameters, so it won't be inherited in new execution contexts. -func (exec *execCtx) disableForwarding() { - exec.prm.SetRequestForwarder(nil) -} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index a562d9a1dd..2d24456d75 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -66,7 +66,7 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { } func (s *Service) get(ctx context.Context, prm RequestParameters) error { - exec := &execCtx{ + exec := &request{ keyStore: s.keyStore, traverserGenerator: s.traverserGenerator, remoteStorageConstructor: s.remoteStorageConstructor, @@ -84,7 +84,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error { return exec.statusError.err } -func (exec *execCtx) execute(ctx context.Context) { +func (exec *request) execute(ctx context.Context) { exec.log.Debug(logs.ServingRequest) // perform local operation @@ -93,7 +93,7 @@ func (exec *execCtx) execute(ctx context.Context) { exec.analyzeStatus(ctx, true) } -func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { +func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) { // analyze local result switch exec.status { case statusOK: diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index d6ff00880e..9c55060645 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -1206,7 +1206,7 @@ func TestGetRemoteSmall(t *testing.T) { err := svc.Get(ctx, p) require.Error(t, err) - require.Equal(t, err.Error(), "received child with empty parent") + require.ErrorIs(t, err, errChildWithEmptyParent) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() @@ -1219,7 +1219,7 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.Error(t, err) - require.Equal(t, err.Error(), "received child with empty parent") + require.ErrorIs(t, err, errChildWithEmptyParent) }) t.Run("out of range", func(t *testing.T) { diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 135fc3cd59..8affa71c2e 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeLocal(ctx context.Context) { +func (r *request) executeLocal(ctx context.Context) { ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal") defer func() { span.End() @@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) { var err error - exec.collectedObject, err = exec.get(ctx) + r.collectedObject, err = r.get(ctx) var errSplitInfo *objectSDK.SplitInfoError var errRemoved apistatus.ObjectAlreadyRemoved @@ -27,35 +27,35 @@ func (exec *execCtx) executeLocal(ctx context.Context) { switch { default: - exec.status = statusUndefined - exec.err = err + r.status = statusUndefined + r.err = err - exec.log.Debug(logs.GetLocalGetFailed, + r.log.Debug(logs.GetLocalGetFailed, zap.String("error", err.Error()), ) case err == nil: - exec.status = statusOK - exec.err = nil - exec.writeCollectedObject(ctx) + r.status = statusOK + r.err = nil + r.writeCollectedObject(ctx) case errors.As(err, &errRemoved): - exec.status = statusINHUMED - exec.err = errRemoved + r.status = statusINHUMED + r.err = errRemoved case errors.As(err, &errSplitInfo): - exec.status = statusVIRTUAL - mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) - exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) + r.status = statusVIRTUAL + mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) + r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errOutOfRange): - exec.status = statusOutOfRange - exec.err = errOutOfRange + r.status = statusOutOfRange + r.err = errOutOfRange } } -func (exec *execCtx) get(ctx context.Context) (*objectSDK.Object, error) { - if exec.headOnly() { - return exec.localStorage.Head(ctx, exec.address(), exec.isRaw()) +func (r *request) get(ctx context.Context) (*objectSDK.Object, error) { + if r.headOnly() { + return r.localStorage.Head(ctx, r.address(), r.isRaw()) } - if rng := exec.ctxRange(); rng != nil { - return exec.localStorage.Range(ctx, exec.address(), rng) + if rng := r.ctxRange(); rng != nil { + return r.localStorage.Range(ctx, r.address(), rng) } - return exec.localStorage.Get(ctx, exec.address()) + return r.localStorage.Get(ctx, r.address()) } diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 7278eb2e9e..cbdb7a3e24 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -3,12 +3,11 @@ package getsvc import ( "context" "crypto/ecdsa" - "errors" "hash" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) @@ -21,14 +20,9 @@ type Prm struct { type RangePrm struct { commonPrm - rng *object.Range + rng *objectSDK.Range } -var ( - errRangeZeroLength = errors.New("zero range length") - errRangeOverflow = errors.New("range overflow") -) - // Validate pre-validates `OBJECTRANGE` request's parameters content // without access to the requested object's payload. func (p RangePrm) Validate() error { @@ -54,12 +48,18 @@ type RangeHashPrm struct { hashGen func() hash.Hash - rngs []object.Range + rngs []objectSDK.Range salt []byte } -type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error) +type RequestParameters struct { + commonPrm + head bool + rng *objectSDK.Range +} + +type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*objectSDK.Object, error) // HeadPrm groups parameters of Head service call. type HeadPrm struct { @@ -83,24 +83,6 @@ type commonPrm struct { signerKey *ecdsa.PrivateKey } -// ChunkWriter is an interface of target component -// to write payload chunk. -type ChunkWriter interface { - WriteChunk(context.Context, []byte) error -} - -// HeaderWriter is an interface of target component -// to write object header. -type HeaderWriter interface { - WriteHeader(context.Context, *object.Object) error -} - -// ObjectWriter is an interface of target component to write object. -type ObjectWriter interface { - HeaderWriter - ChunkWriter -} - // SetObjectWriter sets target component to write the object. func (p *Prm) SetObjectWriter(w ObjectWriter) { p.objWriter = w @@ -114,12 +96,12 @@ func (p *commonPrm) SetChunkWriter(w ChunkWriter) { } // SetRange sets range of the requested payload data. -func (p *RangePrm) SetRange(rng *object.Range) { +func (p *RangePrm) SetRange(rng *objectSDK.Range) { p.rng = rng } // SetRangeList sets a list of object payload ranges. -func (p *RangeHashPrm) SetRangeList(rngs []object.Range) { +func (p *RangeHashPrm) SetRangeList(rngs []objectSDK.Range) { p.rngs = rngs } diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index d4c5d7a6cb..ac662b3bb1 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -12,18 +12,18 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool { +func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode") defer span.End() - exec.log.Debug(logs.ProcessingNode) + r.log.Debug(logs.ProcessingNode) - rs, ok := exec.getRemoteStorage(info) + rs, ok := r.getRemoteStorage(info) if !ok { return true } - obj, err := exec.getRemote(ctx, rs, info) + obj, err := r.getRemote(ctx, rs, info) var errSplitInfo *objectSDK.SplitInfoError var errRemoved *apistatus.ObjectAlreadyRemoved @@ -33,68 +33,68 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool default: var errNotFound apistatus.ObjectNotFound - exec.status = statusUndefined - exec.err = errNotFound + r.status = statusUndefined + r.err = errNotFound - exec.log.Debug(logs.GetRemoteCallFailed, + r.log.Debug(logs.GetRemoteCallFailed, zap.String("error", err.Error()), ) case err == nil: - exec.status = statusOK - exec.err = nil + r.status = statusOK + r.err = nil // both object and err are nil only if the original // request was forwarded to another node and the object // has already been streamed to the requesting party if obj != nil { - exec.collectedObject = obj - exec.writeCollectedObject(ctx) + r.collectedObject = obj + r.writeCollectedObject(ctx) } case errors.As(err, &errRemoved): - exec.status = statusINHUMED - exec.err = errRemoved + r.status = statusINHUMED + r.err = errRemoved case errors.As(err, &errOutOfRange): - exec.status = statusOutOfRange - exec.err = errOutOfRange + r.status = statusOutOfRange + r.err = errOutOfRange case errors.As(err, &errSplitInfo): - exec.status = statusVIRTUAL - mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) - exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) + r.status = statusVIRTUAL + mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) + r.err = objectSDK.NewSplitInfoError(r.infoSplit) } - return exec.status != statusUndefined + return r.status != statusUndefined } -func (exec *execCtx) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) { - if exec.isForwardingEnabled() { - return rs.ForwardRequest(ctx, info, exec.prm.forwarder) +func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) { + if r.isForwardingEnabled() { + return rs.ForwardRequest(ctx, info, r.prm.forwarder) } - key, err := exec.key() + key, err := r.key() if err != nil { return nil, err } prm := RemoteRequestParams{ - Epoch: exec.curProcEpoch, - TTL: exec.prm.common.TTL(), + Epoch: r.curProcEpoch, + TTL: r.prm.common.TTL(), PrivateKey: key, - SessionToken: exec.prm.common.SessionToken(), - BearerToken: exec.prm.common.BearerToken(), - XHeaders: exec.prm.common.XHeaders(), - IsRaw: exec.isRaw(), + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + IsRaw: r.isRaw(), } - if exec.headOnly() { - return rs.Head(ctx, exec.address(), prm) + if r.headOnly() { + return rs.Head(ctx, r.address(), prm) } // we don't specify payload writer because we accumulate // the object locally (even huge). - if rng := exec.ctxRange(); rng != nil { + if rng := r.ctxRange(); rng != nil { // Current spec allows other storage node to deny access, // fallback to GET here. - return rs.Range(ctx, exec.address(), rng, prm) + return rs.Range(ctx, r.address(), rng, prm) } - return rs.Get(ctx, exec.address(), prm) + return rs.Get(ctx, r.address(), prm) } diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go new file mode 100644 index 0000000000..db77d619a2 --- /dev/null +++ b/pkg/services/object/get/request.go @@ -0,0 +1,252 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +type request struct { + prm RequestParameters + + statusError + + infoSplit *objectSDK.SplitInfo + + log *logger.Logger + + collectedObject *objectSDK.Object + + curProcEpoch uint64 + + keyStore keyStorage + epochSource epochSource + traverserGenerator traverserGenerator + remoteStorageConstructor remoteStorageConstructor + localStorage localStorage +} + +func (r *request) setLogger(l *logger.Logger) { + req := "GET" + if r.headOnly() { + req = "HEAD" + } else if r.ctxRange() != nil { + req = "GET_RANGE" + } + + r.log = &logger.Logger{Logger: l.With( + zap.String("request", req), + zap.Stringer("address", r.address()), + zap.Bool("raw", r.isRaw()), + zap.Bool("local", r.isLocal()), + zap.Bool("with session", r.prm.common.SessionToken() != nil), + zap.Bool("with bearer", r.prm.common.BearerToken() != nil), + )} +} + +func (r *request) isLocal() bool { + return r.prm.common.LocalOnly() +} + +func (r *request) isRaw() bool { + return r.prm.raw +} + +func (r *request) address() oid.Address { + return r.prm.addr +} + +func (r *request) key() (*ecdsa.PrivateKey, error) { + if r.prm.signerKey != nil { + // the key has already been requested and + // cached in the previous operations + return r.prm.signerKey, nil + } + + var sessionInfo *util.SessionInfo + + if tok := r.prm.common.SessionToken(); tok != nil { + sessionInfo = &util.SessionInfo{ + ID: tok.ID(), + Owner: tok.Issuer(), + } + } + + return r.keyStore.GetKey(sessionInfo) +} + +func (r *request) canAssemble() bool { + return !r.isRaw() && !r.headOnly() +} + +func (r *request) splitInfo() *objectSDK.SplitInfo { + return r.infoSplit +} + +func (r *request) containerID() cid.ID { + return r.address().Container() +} + +func (r *request) ctxRange() *objectSDK.Range { + return r.prm.rng +} + +func (r *request) headOnly() bool { + return r.prm.head +} + +func (r *request) netmapEpoch() uint64 { + return r.prm.common.NetmapEpoch() +} + +func (r *request) netmapLookupDepth() uint64 { + return r.prm.common.NetmapLookupDepth() +} + +func (r *request) initEpoch() bool { + r.curProcEpoch = r.netmapEpoch() + if r.curProcEpoch > 0 { + return true + } + + e, err := r.epochSource.Epoch() + + switch { + default: + r.status = statusUndefined + r.err = err + + r.log.Debug(logs.CouldNotGetCurrentEpochNumber, + zap.String("error", err.Error()), + ) + + return false + case err == nil: + r.curProcEpoch = e + return true + } +} + +func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { + obj := addr.Object() + + t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) + + switch { + default: + r.status = statusUndefined + r.err = err + + r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, + zap.String("error", err.Error()), + ) + + return nil, false + case err == nil: + return t, true + } +} + +func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + r.status = statusUndefined + r.err = err + + r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient) + + return nil, false + } + + return rs, true +} + +func (r *request) writeCollectedHeader(ctx context.Context) bool { + if r.ctxRange() != nil { + return true + } + + err := r.prm.objWriter.WriteHeader( + ctx, + r.collectedObject.CutPayload(), + ) + + switch { + default: + r.status = statusUndefined + r.err = err + + r.log.Debug(logs.GetCouldNotWriteHeader, + zap.String("error", err.Error()), + ) + case err == nil: + r.status = statusOK + r.err = nil + } + + return r.status == statusOK +} + +func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool { + if r.headOnly() { + return true + } + + err := r.prm.objWriter.WriteChunk(ctx, obj.Payload()) + + switch { + default: + r.status = statusUndefined + r.err = err + + r.log.Debug(logs.GetCouldNotWritePayloadChunk, + zap.String("error", err.Error()), + ) + case err == nil: + r.status = statusOK + r.err = nil + } + + return err == nil +} + +func (r *request) writeCollectedObject(ctx context.Context) { + if ok := r.writeCollectedHeader(ctx); ok { + r.writeObjectPayload(ctx, r.collectedObject) + } +} + +// isForwardingEnabled returns true if common execution +// parameters has request forwarding closure set. +func (r request) isForwardingEnabled() bool { + return r.prm.forwarder != nil +} + +// disableForwarding removes request forwarding closure from common +// parameters, so it won't be inherited in new execution contexts. +func (r *request) disableForwarding() { + r.prm.SetRequestForwarder(nil) +} + +func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { + if last, ok := src.LastPart(); ok { + dst.SetLastPart(last) + } + + if link, ok := src.Link(); ok { + dst.SetLink(link) + } + + if splitID := src.SplitID(); splitID != nil { + dst.SetSplitID(splitID) + } +} diff --git a/pkg/services/object/get/res.go b/pkg/services/object/get/res.go deleted file mode 100644 index 75a5aaedde..0000000000 --- a/pkg/services/object/get/res.go +++ /dev/null @@ -1,9 +0,0 @@ -package getsvc - -type RangeHashRes struct { - hashes [][]byte -} - -func (r *RangeHashRes) Hashes() [][]byte { - return r.hashes -} diff --git a/pkg/services/object/get/status.go b/pkg/services/object/get/status.go new file mode 100644 index 0000000000..3a5eebe329 --- /dev/null +++ b/pkg/services/object/get/status.go @@ -0,0 +1,14 @@ +package getsvc + +const ( + statusUndefined int = iota + statusOK + statusINHUMED + statusVIRTUAL + statusOutOfRange +) + +type statusError struct { + status int + err error +} diff --git a/pkg/services/object/get/types.go b/pkg/services/object/get/types.go index 800d37996e..a866132ccd 100644 --- a/pkg/services/object/get/types.go +++ b/pkg/services/object/get/types.go @@ -228,3 +228,11 @@ func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK return obj } + +type RangeHashRes struct { + hashes [][]byte +} + +func (r *RangeHashRes) Hashes() [][]byte { + return r.hashes +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/writer.go similarity index 72% rename from pkg/services/object/get/util.go rename to pkg/services/object/get/writer.go index 2e01809d3d..78af5db415 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/writer.go @@ -7,6 +7,24 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" ) +// ChunkWriter is an interface of target component +// to write payload chunk. +type ChunkWriter interface { + WriteChunk(context.Context, []byte) error +} + +// HeaderWriter is an interface of target component +// to write object header. +type HeaderWriter interface { + WriteHeader(context.Context, *object.Object) error +} + +// ObjectWriter is an interface of target component to write object. +type ObjectWriter interface { + HeaderWriter + ChunkWriter +} + type SimpleObjectWriter struct { obj *object.Object