diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index ebae18eb..db71df6a 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -10,7 +10,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) assemble() { +func (exec *execCtx) assemble(ctx context.Context) { if !exec.canAssemble() { exec.log.Debug("can not assemble the object") return @@ -49,7 +49,7 @@ func (exec *execCtx) assemble() { zap.Uint64("range_length", exec.ctxRange().GetLength()), ) - obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter) + obj, err := assembler.Assemble(ctx, exec.prm.objWriter) if err != nil { exec.log.Warn("failed to assemble splitted object", zap.Error(err), @@ -107,8 +107,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje w := NewSimpleObjectWriter() prm.SetHeaderWriter(w) - //nolint: contextcheck - err := exec.svc.Head(exec.context(), prm) + err := exec.svc.Head(ctx, prm) if err != nil { return nil, err @@ -128,8 +127,7 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra p.addr.SetContainer(exec.containerID()) p.addr.SetObject(id) - //nolint: contextcheck - statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng)) + statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng)) if statusError.err != nil { return nil, statusError.err diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 88286112..cfb538d3 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeOnContainer() { +func (exec *execCtx) executeOnContainer(ctx context.Context) { if exec.isLocal() { exec.log.Debug("return result directly") return @@ -26,7 +26,7 @@ func (exec *execCtx) executeOnContainer() { } for { - if exec.processCurrentEpoch() { + if exec.processCurrentEpoch(ctx) { break } @@ -42,7 +42,7 @@ func (exec *execCtx) executeOnContainer() { } } -func (exec *execCtx) processCurrentEpoch() bool { +func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { exec.log.Debug("process epoch", zap.Uint64("number", exec.curProcEpoch), ) @@ -52,7 +52,7 @@ func (exec *execCtx) processCurrentEpoch() bool { return true } - ctx, cancel := context.WithCancel(exec.context()) + ctx, cancel := context.WithCancel(ctx) defer cancel() exec.status = statusUndefined diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 9858b32b..2ba01457 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -19,12 +19,9 @@ type statusError struct { err error } -// nolint: containedctx type execCtx struct { svc *Service - ctx context.Context - prm RangePrm statusError @@ -80,10 +77,6 @@ func (exec *execCtx) setLogger(l *logger.Logger) { )} } -func (exec execCtx) context() context.Context { - return exec.ctx -} - func (exec execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } @@ -217,13 +210,13 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { } } -func (exec *execCtx) writeCollectedHeader() bool { +func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool { if exec.ctxRange() != nil { return true } err := exec.prm.objWriter.WriteHeader( - exec.context(), + ctx, exec.collectedObject.CutPayload(), ) @@ -243,12 +236,12 @@ func (exec *execCtx) writeCollectedHeader() bool { return exec.status == statusOK } -func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { +func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool { if exec.headOnly() { return true } - err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload()) + err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload()) switch { default: @@ -266,9 +259,9 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { return err == nil } -func (exec *execCtx) writeCollectedObject() { - if ok := exec.writeCollectedHeader(); ok { - exec.writeObjectPayload(exec.collectedObject) +func (exec *execCtx) writeCollectedObject(ctx context.Context) { + if ok := exec.writeCollectedHeader(ctx); ok { + exec.writeObjectPayload(ctx, exec.collectedObject) } } diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index cdb2d96f..0f5983e9 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -65,7 +65,6 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { exec := &execCtx{ svc: s, - ctx: ctx, prm: RangePrm{ commonPrm: prm, }, @@ -78,22 +77,21 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st exec.setLogger(s.log) - //nolint: contextcheck - exec.execute() + exec.execute(ctx) return exec.statusError } -func (exec *execCtx) execute() { +func (exec *execCtx) execute(ctx context.Context) { exec.log.Debug("serving request...") // perform local operation - exec.executeLocal() + exec.executeLocal(ctx) - exec.analyzeStatus(true) + exec.analyzeStatus(ctx, true) } -func (exec *execCtx) analyzeStatus(execCnr bool) { +func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { // analyze local result switch exec.status { case statusOK: @@ -102,7 +100,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { exec.log.Debug("requested object was marked as removed") case statusVIRTUAL: exec.log.Debug("requested object is virtual") - exec.assemble() + exec.assemble(ctx) case statusOutOfRange: exec.log.Debug("requested range is out of object bounds") default: @@ -111,8 +109,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { ) if execCnr { - exec.executeOnContainer() - exec.analyzeStatus(false) + exec.executeOnContainer(ctx) + exec.analyzeStatus(ctx, false) } } } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 36a0e497..3d1a95cb 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -117,7 +117,7 @@ func newTestClient() *testClient { } } -func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { +func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { v, ok := c.results[exec.address().EncodeToString()] if !ok { var errNotFound apistatus.ObjectNotFound diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index f526af4e..a6a77729 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -1,6 +1,7 @@ package getsvc import ( + "context" "errors" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -8,7 +9,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeLocal() { +func (exec *execCtx) executeLocal(ctx context.Context) { var err error exec.collectedObject, err = exec.svc.localStorage.get(exec) @@ -28,7 +29,7 @@ func (exec *execCtx) executeLocal() { case err == nil: exec.status = statusOK exec.err = nil - exec.writeCollectedObject() + exec.writeCollectedObject(ctx) case errors.As(err, &errRemoved): exec.status = statusINHUMED exec.err = errRemoved diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index fbfb01bc..1532bade 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -18,7 +18,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool return true } - obj, err := client.getObject(exec, info) + obj, err := client.getObject(ctx, exec, info) var errSplitInfo *objectSDK.SplitInfoError var errRemoved *apistatus.ObjectAlreadyRemoved @@ -43,8 +43,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool // has already been streamed to the requesting party if obj != nil { exec.collectedObject = obj - //nolint: contextcheck - exec.writeCollectedObject() + exec.writeCollectedObject(ctx) } case errors.As(err, &errRemoved): exec.status = statusINHUMED diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index e69ab4f0..dfa3b48a 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,6 +1,8 @@ package getsvc import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" @@ -22,7 +24,7 @@ type Service struct { type Option func(*cfg) type getClient interface { - getObject(*execCtx, client.NodeInfo) (*object.Object, error) + getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error) } type cfg struct { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index d647b07f..08c73828 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -88,7 +88,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) { } // nolint: funlen -func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { +func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { if exec.isForwardingEnabled() { return exec.prm.forwarder(info, c.client) } @@ -101,7 +101,7 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj if exec.headOnly() { var prm internalclient.HeadObjectPrm - prm.SetContext(exec.context()) + prm.SetContext(ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) @@ -127,7 +127,7 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj if rng := exec.ctxRange(); rng != nil { var prm internalclient.PayloadRangePrm - prm.SetContext(exec.context()) + prm.SetContext(ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) @@ -148,7 +148,7 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj if errors.As(err, &errAccessDenied) { // Current spec allows other storage node to deny access, // fallback to GET here. - obj, err := c.get(exec, key) + obj, err := c.get(ctx, exec, key) if err != nil { return nil, err } @@ -169,13 +169,13 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj return payloadOnlyObject(res.PayloadRange()), nil } - return c.get(exec, key) + return c.get(ctx, exec, key) } -func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { +func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { var prm internalclient.GetObjectPrm - prm.SetContext(exec.context()) + prm.SetContext(ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch)