diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index ebae18eb5..db71df6a4 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -10,7 +10,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) assemble() { +func (exec *execCtx) assemble(ctx context.Context) { if !exec.canAssemble() { exec.log.Debug("can not assemble the object") return @@ -49,7 +49,7 @@ func (exec *execCtx) assemble() { zap.Uint64("range_length", exec.ctxRange().GetLength()), ) - obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter) + obj, err := assembler.Assemble(ctx, exec.prm.objWriter) if err != nil { exec.log.Warn("failed to assemble splitted object", zap.Error(err), @@ -107,8 +107,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje w := NewSimpleObjectWriter() prm.SetHeaderWriter(w) - //nolint: contextcheck - err := exec.svc.Head(exec.context(), prm) + err := exec.svc.Head(ctx, prm) if err != nil { return nil, err @@ -128,8 +127,7 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra p.addr.SetContainer(exec.containerID()) p.addr.SetObject(id) - //nolint: contextcheck - statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng)) + statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng)) if statusError.err != nil { return nil, statusError.err diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 882861129..cfb538d38 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeOnContainer() { +func (exec *execCtx) executeOnContainer(ctx context.Context) { if exec.isLocal() { exec.log.Debug("return result directly") return @@ -26,7 +26,7 @@ func (exec *execCtx) executeOnContainer() { } for { - if exec.processCurrentEpoch() { + if exec.processCurrentEpoch(ctx) { break } @@ -42,7 +42,7 @@ func (exec *execCtx) executeOnContainer() { } } -func (exec *execCtx) processCurrentEpoch() bool { +func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { exec.log.Debug("process epoch", zap.Uint64("number", exec.curProcEpoch), ) @@ -52,7 +52,7 @@ func (exec *execCtx) processCurrentEpoch() bool { return true } - ctx, cancel := context.WithCancel(exec.context()) + ctx, cancel := context.WithCancel(ctx) defer cancel() exec.status = statusUndefined diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 9858b32b2..2ba014574 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -19,12 +19,9 @@ type statusError struct { err error } -// nolint: containedctx type execCtx struct { svc *Service - ctx context.Context - prm RangePrm statusError @@ -80,10 +77,6 @@ func (exec *execCtx) setLogger(l *logger.Logger) { )} } -func (exec execCtx) context() context.Context { - return exec.ctx -} - func (exec execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } @@ -217,13 +210,13 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { } } -func (exec *execCtx) writeCollectedHeader() bool { +func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool { if exec.ctxRange() != nil { return true } err := exec.prm.objWriter.WriteHeader( - exec.context(), + ctx, exec.collectedObject.CutPayload(), ) @@ -243,12 +236,12 @@ func (exec *execCtx) writeCollectedHeader() bool { return exec.status == statusOK } -func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { +func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool { if exec.headOnly() { return true } - err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload()) + err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload()) switch { default: @@ -266,9 +259,9 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { return err == nil } -func (exec *execCtx) writeCollectedObject() { - if ok := exec.writeCollectedHeader(); ok { - exec.writeObjectPayload(exec.collectedObject) +func (exec *execCtx) writeCollectedObject(ctx context.Context) { + if ok := exec.writeCollectedHeader(ctx); ok { + exec.writeObjectPayload(ctx, exec.collectedObject) } } diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index cdb2d96fd..0f5983e99 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -65,7 +65,6 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { exec := &execCtx{ svc: s, - ctx: ctx, prm: RangePrm{ commonPrm: prm, }, @@ -78,22 +77,21 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st exec.setLogger(s.log) - //nolint: contextcheck - exec.execute() + exec.execute(ctx) return exec.statusError } -func (exec *execCtx) execute() { +func (exec *execCtx) execute(ctx context.Context) { exec.log.Debug("serving request...") // perform local operation - exec.executeLocal() + exec.executeLocal(ctx) - exec.analyzeStatus(true) + exec.analyzeStatus(ctx, true) } -func (exec *execCtx) analyzeStatus(execCnr bool) { +func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { // analyze local result switch exec.status { case statusOK: @@ -102,7 +100,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { exec.log.Debug("requested object was marked as removed") case statusVIRTUAL: exec.log.Debug("requested object is virtual") - exec.assemble() + exec.assemble(ctx) case statusOutOfRange: exec.log.Debug("requested range is out of object bounds") default: @@ -111,8 +109,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { ) if execCnr { - exec.executeOnContainer() - exec.analyzeStatus(false) + exec.executeOnContainer(ctx) + exec.analyzeStatus(ctx, false) } } } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 36a0e4976..3d1a95cbb 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -117,7 +117,7 @@ func newTestClient() *testClient { } } -func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { +func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { v, ok := c.results[exec.address().EncodeToString()] if !ok { var errNotFound apistatus.ObjectNotFound diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index f526af4e6..a6a77729c 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -1,6 +1,7 @@ package getsvc import ( + "context" "errors" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -8,7 +9,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeLocal() { +func (exec *execCtx) executeLocal(ctx context.Context) { var err error exec.collectedObject, err = exec.svc.localStorage.get(exec) @@ -28,7 +29,7 @@ func (exec *execCtx) executeLocal() { case err == nil: exec.status = statusOK exec.err = nil - exec.writeCollectedObject() + exec.writeCollectedObject(ctx) case errors.As(err, &errRemoved): exec.status = statusINHUMED exec.err = errRemoved diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 88848264e..7a0f1e062 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -59,7 +59,7 @@ type RangeHashPrm struct { salt []byte } -type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error) +type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error) // HeadPrm groups parameters of Head service call. type HeadPrm struct { diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index fbfb01bcd..1532bade0 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -18,7 +18,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool return true } - obj, err := client.getObject(exec, info) + obj, err := client.getObject(ctx, exec, info) var errSplitInfo *objectSDK.SplitInfoError var errRemoved *apistatus.ObjectAlreadyRemoved @@ -43,8 +43,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool // has already been streamed to the requesting party if obj != nil { exec.collectedObject = obj - //nolint: contextcheck - exec.writeCollectedObject() + exec.writeCollectedObject(ctx) } case errors.As(err, &errRemoved): exec.status = statusINHUMED diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index e69ab4f0f..dfa3b48ac 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,6 +1,8 @@ package getsvc import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" @@ -22,7 +24,7 @@ type Service struct { type Option func(*cfg) type getClient interface { - getObject(*execCtx, client.NodeInfo) (*object.Object, error) + getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error) } type cfg struct { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index d647b07f6..7986d05c0 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -87,10 +87,9 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) { }, nil } -// nolint: funlen -func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { +func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { if exec.isForwardingEnabled() { - return exec.prm.forwarder(info, c.client) + return exec.prm.forwarder(ctx, info, c.client) } key, err := exec.key() @@ -99,83 +98,91 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj } if exec.headOnly() { - var prm internalclient.HeadObjectPrm - - prm.SetContext(exec.context()) - prm.SetClient(c.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) - - if exec.isRaw() { - prm.SetRawFlag() - } - - res, err := internalclient.HeadObject(prm) - if err != nil { - return nil, err - } - - return res.Header(), nil + return c.getHeadOnly(ctx, exec, key) } // we don't specify payload writer because we accumulate // the object locally (even huge). if rng := exec.ctxRange(); rng != nil { - var prm internalclient.PayloadRangePrm - - prm.SetContext(exec.context()) - prm.SetClient(c.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) - prm.SetRange(rng) - - if exec.isRaw() { - prm.SetRawFlag() - } - - res, err := internalclient.PayloadRange(prm) - if err != nil { - var errAccessDenied *apistatus.ObjectAccessDenied - if errors.As(err, &errAccessDenied) { - // Current spec allows other storage node to deny access, - // fallback to GET here. - obj, err := c.get(exec, key) - if err != nil { - return nil, err - } - - payload := obj.Payload() - from := rng.GetOffset() - to := from + rng.GetLength() - - if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { - return nil, new(apistatus.ObjectOutOfRange) - } - - return payloadOnlyObject(payload[from:to]), nil - } - return nil, err - } - - return payloadOnlyObject(res.PayloadRange()), nil + // Current spec allows other storage node to deny access, + // fallback to GET here. + return c.getRange(ctx, exec, key, rng) } - return c.get(exec, key) + return c.get(ctx, exec, key) } -func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { +func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) { + var prm internalclient.PayloadRangePrm + + prm.SetContext(ctx) + prm.SetClient(c.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetRange(rng) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.PayloadRange(prm) + if err != nil { + var errAccessDenied *apistatus.ObjectAccessDenied + if errors.As(err, &errAccessDenied) { + obj, err := c.get(ctx, exec, key) + if err != nil { + return nil, err + } + + payload := obj.Payload() + from := rng.GetOffset() + to := from + rng.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return nil, new(apistatus.ObjectOutOfRange) + } + + return payloadOnlyObject(payload[from:to]), nil + } + return nil, err + } + + return payloadOnlyObject(res.PayloadRange()), nil +} + +func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { + var prm internalclient.HeadObjectPrm + + prm.SetContext(ctx) + prm.SetClient(c.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.HeadObject(prm) + if err != nil { + return nil, err + } + + return res.Header(), nil +} + +func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { var prm internalclient.GetObjectPrm - prm.SetContext(exec.context()) + prm.SetContext(ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) diff --git a/pkg/services/object/get/v2/get_forwarder.go b/pkg/services/object/get/v2/get_forwarder.go new file mode 100644 index 000000000..330a0642f --- /dev/null +++ b/pkg/services/object/get/v2/get_forwarder.go @@ -0,0 +1,168 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + "sync" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +type getRequestForwarder struct { + OnceResign *sync.Once + OnceHeaderSending *sync.Once + GlobalProgress int + Key *ecdsa.PrivateKey + Request *objectV2.GetRequest + Stream *streamObjectWriter +} + +func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + var err error + + // once compose and resign forwarding request + f.OnceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(f.Request.GetMetaHeader()) + writeCurrentVersion(metaHdr) + f.Request.SetMetaHeader(metaHdr) + err = signature.SignServiceMessage(f.Key, f.Request) + }) + + if err != nil { + return nil, err + } + + getStream, err := f.openStream(ctx, addr, c) + if err != nil { + return nil, err + } + return nil, f.readStream(ctx, c, getStream, pubkey) +} + +func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error { + // verify response key + if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil { + return err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return err + } + return nil +} + +func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error { + obj := new(objectV2.Object) + + obj.SetObjectID(v.GetObjectID()) + obj.SetSignature(v.GetSignature()) + obj.SetHeader(v.GetHeader()) + + var err error + f.OnceHeaderSending.Do(func() { + err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj)) + }) + if err != nil { + return fmt.Errorf("could not write object header in Get forwarder: %w", err) + } + return nil +} + +func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) { + var getStream *rpc.GetResponseReader + err := c.RawForAddress(addr, func(cli *rpcclient.Client) error { + var e error + getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx)) + return e + }) + if err != nil { + return nil, fmt.Errorf("stream opening failed: %w", err) + } + return getStream, nil +} + +func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error { + var ( + headWas bool + resp = new(objectV2.GetResponse) + localProgress int + ) + + for { + // receive message from server stream + err := getStream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + if !headWas { + return io.ErrUnexpectedEOF + } + + break + } + + internalclient.ReportError(c, err) + return fmt.Errorf("reading the response failed: %w", err) + } + + if err := f.verifyResponse(resp, pubkey); err != nil { + return err + } + + switch v := resp.GetBody().GetObjectPart().(type) { + default: + return fmt.Errorf("unexpected object part %T", v) + case *objectV2.GetObjectPartInit: + if headWas { + return errWrongMessageSeq + } + headWas = true + if err := f.writeHeader(ctx, v); err != nil { + return err + } + case *objectV2.GetObjectPartChunk: + if !headWas { + return errWrongMessageSeq + } + + origChunk := v.GetChunk() + + chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk) + if len(chunk) == 0 { + localProgress += len(origChunk) + continue + } + + if err = f.Stream.WriteChunk(ctx, chunk); err != nil { + return fmt.Errorf("could not write object chunk in Get forwarder: %w", err) + } + + localProgress += len(origChunk) + f.GlobalProgress += len(chunk) + case *objectV2.SplitInfo: + si := object.NewSplitInfoFromV2(v) + return object.NewSplitInfoError(si) + } + } + return nil +} diff --git a/pkg/services/object/get/v2/get_range_forwarder.go b/pkg/services/object/get/v2/get_range_forwarder.go new file mode 100644 index 000000000..5893f8de3 --- /dev/null +++ b/pkg/services/object/get/v2/get_range_forwarder.go @@ -0,0 +1,134 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + "sync" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +type getRangeRequestForwarder struct { + OnceResign *sync.Once + GlobalProgress int + Key *ecdsa.PrivateKey + Request *objectV2.GetRangeRequest + Stream *streamObjectRangeWriter +} + +func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + var err error + + // once compose and resign forwarding request + f.OnceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(f.Request.GetMetaHeader()) + writeCurrentVersion(metaHdr) + + f.Request.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(f.Key, f.Request) + }) + + if err != nil { + return nil, err + } + + rangeStream, err := f.openStream(ctx, addr, c) + if err != nil { + return nil, err + } + + return nil, f.readStream(ctx, rangeStream, c, pubkey) +} + +func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeResponse, pubkey []byte) error { + // verify response key + if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil { + return err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("could not verify %T: %w", resp, err) + } + + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return err + } + return nil +} + +func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) { + // open stream + var rangeStream *rpc.ObjectRangeResponseReader + err := c.RawForAddress(addr, func(cli *rpcclient.Client) error { + var e error + rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx)) + return e + }) + if err != nil { + return nil, fmt.Errorf("could not create Get payload range stream: %w", err) + } + return rangeStream, nil +} + +func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error { + resp := new(objectV2.GetRangeResponse) + var localProgress int + + for { + // receive message from server stream + err := rangeStream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + internalclient.ReportError(c, err) + return fmt.Errorf("reading the response failed: %w", err) + } + + if err := f.verifyResponse(resp, pubkey); err != nil { + return err + } + + switch v := resp.GetBody().GetRangePart().(type) { + case nil: + return fmt.Errorf("unexpected range type %T", v) + case *objectV2.GetRangePartChunk: + origChunk := v.GetChunk() + + chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk) + if len(chunk) == 0 { + localProgress += len(origChunk) + continue + } + + if err = f.Stream.WriteChunk(ctx, chunk); err != nil { + return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) + } + + localProgress += len(origChunk) + f.GlobalProgress += len(chunk) + case *objectV2.SplitInfo: + si := object.NewSplitInfoFromV2(v) + return object.NewSplitInfoError(si) + } + } + return nil +} diff --git a/pkg/services/object/get/v2/head_forwarder.go b/pkg/services/object/get/v2/head_forwarder.go new file mode 100644 index 000000000..45c0174fd --- /dev/null +++ b/pkg/services/object/get/v2/head_forwarder.go @@ -0,0 +1,175 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "sync" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type headRequestForwarder struct { + Request *objectV2.HeadRequest + Response *objectV2.HeadResponse + OnceResign *sync.Once + ObjectAddr oid.Address + Key *ecdsa.PrivateKey +} + +func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + var err error + + // once compose and resign forwarding request + f.OnceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(f.Request.GetMetaHeader()) + writeCurrentVersion(metaHdr) + + f.Request.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(f.Key, f.Request) + }) + + if err != nil { + return nil, err + } + + headResp, err := f.sendHeadRequest(ctx, addr, c) + if err != nil { + return nil, err + } + + if err := f.verifyResponse(headResp, pubkey); err != nil { + return nil, err + } + + var ( + hdr *objectV2.Header + idSig *refs.Signature + ) + + switch v := headResp.GetBody().GetHeaderPart().(type) { + case nil: + return nil, fmt.Errorf("unexpected header type %T", v) + case *objectV2.ShortHeader: + if hdr, err = f.getHeaderFromShortHeader(v); err != nil { + return nil, err + } + case *objectV2.HeaderWithSignature: + if hdr, idSig, err = f.getHeaderAndSignature(v); err != nil { + return nil, err + } + case *objectV2.SplitInfo: + si := object.NewSplitInfoFromV2(v) + return nil, object.NewSplitInfoError(si) + } + + objv2 := new(objectV2.Object) + objv2.SetHeader(hdr) + objv2.SetSignature(idSig) + + obj := object.NewFromV2(objv2) + obj.SetID(f.ObjectAddr.Object()) + + return obj, nil +} + +func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) { + if !f.Request.GetBody().GetMainOnly() { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), + ) + } + + hdr := new(objectV2.Header) + hdr.SetPayloadLength(sh.GetPayloadLength()) + hdr.SetVersion(sh.GetVersion()) + hdr.SetOwnerID(sh.GetOwnerID()) + hdr.SetObjectType(sh.GetObjectType()) + hdr.SetCreationEpoch(sh.GetCreationEpoch()) + hdr.SetPayloadHash(sh.GetPayloadHash()) + hdr.SetHomomorphicHash(sh.GetHomomorphicHash()) + return hdr, nil +} + +func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) { + if f.Request.GetBody().GetMainOnly() { + return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), + ) + } + + if hdrWithSig == nil { + return nil, nil, errors.New("nil object part") + } + + hdr := hdrWithSig.GetHeader() + idSig := hdrWithSig.GetSignature() + + if idSig == nil { + // TODO(@cthulhu-rider): #1387 use "const" error + return nil, nil, errors.New("missing signature") + } + + binID, err := f.ObjectAddr.Object().Marshal() + if err != nil { + return nil, nil, fmt.Errorf("marshal ID: %w", err) + } + + var sig frostfscrypto.Signature + if err := sig.ReadFromV2(*idSig); err != nil { + return nil, nil, fmt.Errorf("can't read signature: %w", err) + } + + if !sig.Verify(binID) { + return nil, nil, errors.New("invalid object ID signature") + } + + return hdr, idSig, nil +} + +func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*objectV2.HeadResponse, error) { + var headResp *objectV2.HeadResponse + err := c.RawForAddress(addr, func(cli *rpcclient.Client) error { + var e error + headResp, e = rpc.HeadObject(cli, f.Request, rpcclient.WithContext(ctx)) + return e + }) + if err != nil { + return nil, fmt.Errorf("sending the request failed: %w", err) + } + return headResp, nil +} + +func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, pubkey []byte) error { + // verify response key + if err := internal.VerifyResponseKeyV2(pubkey, headResp); err != nil { + return err + } + + // verify response structure + if err := signature.VerifyServiceMessage(headResp); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + + if err := checkStatus(f.Response.GetMetaHeader().GetStatus()); err != nil { + return err + } + return nil +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index a871714a1..3a50a6ca5 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -6,25 +6,18 @@ import ( "errors" "fmt" "hash" - "io" "sync" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" - rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" - internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" @@ -33,7 +26,6 @@ import ( var errWrongMessageSeq = errors.New("incorrect message sequence") -// nolint: funlen, gocognit func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { body := req.GetBody() @@ -49,8 +41,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return nil, fmt.Errorf("invalid object address: %w", err) } - meta := req.GetMetaHeader() - commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return nil, err @@ -66,141 +56,26 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre p.SetObjectWriter(streamWrapper) if !commonPrm.LocalOnly() { - var onceResign sync.Once + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return nil, err + } - var onceHeaderSending sync.Once - var globalProgress int + forwarder := &getRequestForwarder{ + OnceResign: &sync.Once{}, + OnceHeaderSending: &sync.Once{}, + GlobalProgress: 0, + Key: key, + Request: req, + Stream: streamWrapper, + } - p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.GetObject implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - - // open stream - var getStream *rpc.GetResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context())) - return err - }) - if err != nil { - return nil, fmt.Errorf("stream opening failed: %w", err) - } - - var ( - headWas bool - resp = new(objectV2.GetResponse) - localProgress int - ) - - for { - // receive message from server stream - err := getStream.Read(resp) - if err != nil { - if errors.Is(err, io.EOF) { - if !headWas { - return nil, io.ErrUnexpectedEOF - } - - break - } - - internalclient.ReportError(c, err) - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, fmt.Errorf("response verification failed: %w", err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - switch v := resp.GetBody().GetObjectPart().(type) { - default: - return nil, fmt.Errorf("unexpected object part %T", v) - case *objectV2.GetObjectPartInit: - if headWas { - return nil, errWrongMessageSeq - } - - headWas = true - - obj := new(objectV2.Object) - - obj.SetObjectID(v.GetObjectID()) - obj.SetSignature(v.GetSignature()) - obj.SetHeader(v.GetHeader()) - - onceHeaderSending.Do(func() { - err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj)) - }) - if err != nil { - return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) - } - case *objectV2.GetObjectPartChunk: - if !headWas { - return nil, errWrongMessageSeq - } - - origChunk := v.GetChunk() - - chunk := chunkToSend(globalProgress, localProgress, origChunk) - if len(chunk) == 0 { - localProgress += len(origChunk) - continue - } - - if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil { - return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err) - } - - localProgress += len(origChunk) - globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) - return nil, object.NewSplitInfoError(si) - } - } - - return nil, nil - })) + p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode)) } return p, nil } -// nolint: funlen, gocognit func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) { body := req.GetBody() @@ -216,8 +91,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get return nil, fmt.Errorf("invalid object address: %w", err) } - meta := req.GetMetaHeader() - commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return nil, err @@ -239,104 +112,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get } if !commonPrm.LocalOnly() { - var onceResign sync.Once - var globalProgress int - key, err := s.keyStorage.GetKey(nil) if err != nil { return nil, err } - p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error + forwarder := &getRangeRequestForwarder{ + OnceResign: &sync.Once{}, + GlobalProgress: 0, + Key: key, + Request: req, + Stream: streamWrapper, + } - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.ObjectPayloadRangeData implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - - // open stream - var rangeStream *rpc.ObjectRangeResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context())) - return err - }) - if err != nil { - return nil, fmt.Errorf("could not create Get payload range stream: %w", err) - } - - resp := new(objectV2.GetRangeResponse) - var localProgress int - - for { - // receive message from server stream - err := rangeStream.Read(resp) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - internalclient.ReportError(c, err) - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, fmt.Errorf("could not verify %T: %w", resp, err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - switch v := resp.GetBody().GetRangePart().(type) { - case nil: - return nil, fmt.Errorf("unexpected range type %T", v) - case *objectV2.GetRangePartChunk: - origChunk := v.GetChunk() - - chunk := chunkToSend(globalProgress, localProgress, origChunk) - if len(chunk) == 0 { - localProgress += len(origChunk) - continue - } - - if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil { - return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) - } - - localProgress += len(origChunk) - globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) - - return nil, object.NewSplitInfoError(si) - } - } - - return nil, nil - })) + p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode)) } return p, nil @@ -426,7 +215,6 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object) return nil } -// nolint: funlen func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { body := req.GetBody() @@ -442,8 +230,6 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp return nil, fmt.Errorf("invalid object address: %w", err) } - meta := req.GetMetaHeader() - commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return nil, err @@ -463,135 +249,20 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp return p, nil } - var onceResign sync.Once + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return nil, err + } - p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error + forwarder := &headRequestForwarder{ + Request: req, + Response: resp, + OnceResign: &sync.Once{}, + ObjectAddr: objAddr, + Key: key, + } - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.GetObjectHeader implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - - // send Head request - var headResp *objectV2.HeadResponse - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx)) - return err - }) - if err != nil { - return nil, fmt.Errorf("sending the request failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(headResp); err != nil { - return nil, fmt.Errorf("response verification failed: %w", err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - var ( - hdr *objectV2.Header - idSig *refs.Signature - ) - - switch v := headResp.GetBody().GetHeaderPart().(type) { - case nil: - return nil, fmt.Errorf("unexpected header type %T", v) - case *objectV2.ShortHeader: - if !body.GetMainOnly() { - return nil, fmt.Errorf("wrong header part type: expected %T, received %T", - (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), - ) - } - - h := v - - hdr = new(objectV2.Header) - hdr.SetPayloadLength(h.GetPayloadLength()) - hdr.SetVersion(h.GetVersion()) - hdr.SetOwnerID(h.GetOwnerID()) - hdr.SetObjectType(h.GetObjectType()) - hdr.SetCreationEpoch(h.GetCreationEpoch()) - hdr.SetPayloadHash(h.GetPayloadHash()) - hdr.SetHomomorphicHash(h.GetHomomorphicHash()) - case *objectV2.HeaderWithSignature: - if body.GetMainOnly() { - return nil, fmt.Errorf("wrong header part type: expected %T, received %T", - (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), - ) - } - - hdrWithSig := v - if hdrWithSig == nil { - return nil, errors.New("nil object part") - } - - hdr = hdrWithSig.GetHeader() - idSig = hdrWithSig.GetSignature() - - if idSig == nil { - // TODO(@cthulhu-rider): #1387 use "const" error - return nil, errors.New("missing signature") - } - - binID, err := objAddr.Object().Marshal() - if err != nil { - return nil, fmt.Errorf("marshal ID: %w", err) - } - - var sig frostfscrypto.Signature - if err := sig.ReadFromV2(*idSig); err != nil { - return nil, fmt.Errorf("can't read signature: %w", err) - } - - if !sig.Verify(binID) { - return nil, errors.New("invalid object ID signature") - } - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) - - return nil, object.NewSplitInfoError(si) - } - - objv2 := new(objectV2.Object) - objv2.SetHeader(hdr) - objv2.SetSignature(idSig) - - obj := object.NewFromV2(objv2) - obj.SetID(objAddr.Object()) - - // convert the object - return obj, nil - })) + p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode)) return p, nil } @@ -659,8 +330,8 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart { return sh } -func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder { - return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { +func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder { + return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { var ( firstErr error res *object.Object @@ -681,7 +352,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli // would be nice to log otherwise }() - res, err = f(addr, c, key) + res, err = f(ctx, addr, c, key) return })