From 6c7b708a98a7e95fbe8777a1abda75eedf589254 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 31 Mar 2023 17:41:42 +0300 Subject: [PATCH] [#193] getsvc: Refactor get range params creation Resolve funlen linter for toRangePrm function. Signed-off-by: Dmitrii Stepanov --- .../object/get/v2/get_range_forwarder.go | 137 ++++++++++++++++++ pkg/services/object/get/v2/util.go | 110 +------------- 2 files changed, 144 insertions(+), 103 deletions(-) create mode 100644 pkg/services/object/get/v2/get_range_forwarder.go diff --git a/pkg/services/object/get/v2/get_range_forwarder.go b/pkg/services/object/get/v2/get_range_forwarder.go new file mode 100644 index 000000000..a9526f714 --- /dev/null +++ b/pkg/services/object/get/v2/get_range_forwarder.go @@ -0,0 +1,137 @@ +package getsvc + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +type getRangeRequestForwarder struct { + OnceResign *sync.Once + GlobalProgress int + KeyStorage *util.KeyStorage + Request *objectV2.GetRangeRequest + Stream *streamObjectRangeWriter +} + +func (f *getRangeRequestForwarder) forward(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + key, err := f.KeyStorage.GetKey(nil) + if err != nil { + return nil, err + } + + // once compose and resign forwarding request + f.OnceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(f.Request.GetMetaHeader()) + writeCurrentVersion(metaHdr) + + f.Request.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(key, f.Request) + }) + + if err != nil { + return nil, err + } + + rangeStream, err := f.openStream(ctx, addr, c) + if err != nil { + return nil, err + } + + return nil, f.readStream(ctx, rangeStream, c, pubkey) +} + +func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeResponse, pubkey []byte) error { + // verify response key + if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil { + return err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("could not verify %T: %w", resp, err) + } + + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return err + } + return nil +} + +func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) { + // open stream + var rangeStream *rpc.ObjectRangeResponseReader + err := c.RawForAddress(addr, func(cli *rpcclient.Client) error { + var e error + rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx)) + return e + }) + if err != nil { + return nil, fmt.Errorf("could not create Get payload range stream: %w", err) + } + return rangeStream, nil +} + +func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error { + resp := new(objectV2.GetRangeResponse) + var localProgress int + + for { + // receive message from server stream + err := rangeStream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + internalclient.ReportError(c, err) + return fmt.Errorf("reading the response failed: %w", err) + } + + if err := f.verifyResponse(resp, pubkey); err != nil { + return err + } + + switch v := resp.GetBody().GetRangePart().(type) { + case nil: + return fmt.Errorf("unexpected range type %T", v) + case *objectV2.GetRangePartChunk: + origChunk := v.GetChunk() + + chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk) + if len(chunk) == 0 { + localProgress += len(origChunk) + continue + } + + if err = f.Stream.WriteChunk(ctx, chunk); err != nil { + return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) + } + + localProgress += len(origChunk) + f.GlobalProgress += len(chunk) + case *objectV2.SplitInfo: + si := object.NewSplitInfoFromV2(v) + return object.NewSplitInfoError(si) + } + } + return nil +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 3a9cf3b07..dffa0d9b1 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -6,22 +6,16 @@ import ( "errors" "fmt" "hash" - "io" "sync" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" - rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" - internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -79,7 +73,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return p, nil } -// nolint: funlen, gocognit func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) { body := req.GetBody() @@ -95,8 +88,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get return nil, fmt.Errorf("invalid object address: %w", err) } - meta := req.GetMetaHeader() - commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return nil, err @@ -118,103 +109,16 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get } if !commonPrm.LocalOnly() { - var onceResign sync.Once - var globalProgress int - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err + forwarder := &getRangeRequestForwarder{ + OnceResign: &sync.Once{}, + GlobalProgress: 0, + KeyStorage: s.keyStorage, + Request: req, + Stream: streamWrapper, } p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.ObjectPayloadRangeData implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - - // open stream - var rangeStream *rpc.ObjectRangeResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context())) - return err - }) - if err != nil { - return nil, fmt.Errorf("could not create Get payload range stream: %w", err) - } - - resp := new(objectV2.GetRangeResponse) - var localProgress int - - for { - // receive message from server stream - err := rangeStream.Read(resp) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - internalclient.ReportError(c, err) - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, fmt.Errorf("could not verify %T: %w", resp, err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - switch v := resp.GetBody().GetRangePart().(type) { - case nil: - return nil, fmt.Errorf("unexpected range type %T", v) - case *objectV2.GetRangePartChunk: - origChunk := v.GetChunk() - - chunk := chunkToSend(globalProgress, localProgress, origChunk) - if len(chunk) == 0 { - localProgress += len(origChunk) - continue - } - - if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil { - return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) - } - - localProgress += len(origChunk) - globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) - - return nil, object.NewSplitInfoError(si) - } - } - - return nil, nil + return forwarder.forward(stream.Context(), addr, c, pubkey) })) }