diff --git a/pkg/services/object/get/v2/get_forwarder.go b/pkg/services/object/get/v2/get_forwarder.go new file mode 100644 index 00000000..b0ba4752 --- /dev/null +++ b/pkg/services/object/get/v2/get_forwarder.go @@ -0,0 +1,171 @@ +package getsvc + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +type getRequestForwarder struct { + OnceResign *sync.Once + OnceHeaderSending *sync.Once + GlobalProgress int + KeyStorage *util.KeyStorage + Request *objectV2.GetRequest + Stream *streamObjectWriter +} + +func (f *getRequestForwarder) forward(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + key, err := f.KeyStorage.GetKey(nil) + if err != nil { + return nil, err + } + + // once compose and resign forwarding request + f.OnceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(f.Request.GetMetaHeader()) + writeCurrentVersion(metaHdr) + f.Request.SetMetaHeader(metaHdr) + err = signature.SignServiceMessage(key, f.Request) + }) + + if err != nil { + return nil, err + } + + getStream, err := f.openStream(ctx, addr, c) + if err != nil { + return nil, err + } + return nil, f.readStream(ctx, c, getStream, pubkey) +} + +func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error { + // verify response key + if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil { + return err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return err + } + return nil +} + +func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error { + obj := new(objectV2.Object) + + obj.SetObjectID(v.GetObjectID()) + obj.SetSignature(v.GetSignature()) + obj.SetHeader(v.GetHeader()) + + var err error + f.OnceHeaderSending.Do(func() { + err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj)) + }) + if err != nil { + return fmt.Errorf("could not write object header in Get forwarder: %w", err) + } + return nil +} + +func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) { + var getStream *rpc.GetResponseReader + err := c.RawForAddress(addr, func(cli *rpcclient.Client) error { + var e error + getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx)) + return e + }) + if err != nil { + return nil, fmt.Errorf("stream opening failed: %w", err) + } + return getStream, nil +} + +func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error { + var ( + headWas bool + resp = new(objectV2.GetResponse) + localProgress int + ) + + for { + // receive message from server stream + err := getStream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + if !headWas { + return io.ErrUnexpectedEOF + } + + break + } + + internalclient.ReportError(c, err) + return fmt.Errorf("reading the response failed: %w", err) + } + + if err := f.verifyResponse(resp, pubkey); err != nil { + return err + } + + switch v := resp.GetBody().GetObjectPart().(type) { + default: + return fmt.Errorf("unexpected object part %T", v) + case *objectV2.GetObjectPartInit: + if headWas { + return errWrongMessageSeq + } + headWas = true + if err := f.writeHeader(ctx, v); err != nil { + return err + } + case *objectV2.GetObjectPartChunk: + if !headWas { + return errWrongMessageSeq + } + + origChunk := v.GetChunk() + + chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk) + if len(chunk) == 0 { + localProgress += len(origChunk) + continue + } + + if err = f.Stream.WriteChunk(ctx, chunk); err != nil { + return fmt.Errorf("could not write object chunk in Get forwarder: %w", err) + } + + localProgress += len(origChunk) + f.GlobalProgress += len(chunk) + case *objectV2.SplitInfo: + si := object.NewSplitInfoFromV2(v) + return object.NewSplitInfoError(si) + } + } + return nil +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index c659f4e7..3a9cf3b0 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -32,7 +32,6 @@ import ( var errWrongMessageSeq = errors.New("incorrect message sequence") -// nolint: funlen, gocognit func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { body := req.GetBody() @@ -48,8 +47,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return nil, fmt.Errorf("invalid object address: %w", err) } - meta := req.GetMetaHeader() - commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return nil, err @@ -65,134 +62,17 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre p.SetObjectWriter(streamWrapper) if !commonPrm.LocalOnly() { - var onceResign sync.Once - - var onceHeaderSending sync.Once - var globalProgress int + forwarder := &getRequestForwarder{ + OnceResign: &sync.Once{}, + OnceHeaderSending: &sync.Once{}, + GlobalProgress: 0, + KeyStorage: s.keyStorage, + Request: req, + Stream: streamWrapper, + } p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.GetObject implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - - // open stream - var getStream *rpc.GetResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context())) - return err - }) - if err != nil { - return nil, fmt.Errorf("stream opening failed: %w", err) - } - - var ( - headWas bool - resp = new(objectV2.GetResponse) - localProgress int - ) - - for { - // receive message from server stream - err := getStream.Read(resp) - if err != nil { - if errors.Is(err, io.EOF) { - if !headWas { - return nil, io.ErrUnexpectedEOF - } - - break - } - - internalclient.ReportError(c, err) - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, fmt.Errorf("response verification failed: %w", err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - switch v := resp.GetBody().GetObjectPart().(type) { - default: - return nil, fmt.Errorf("unexpected object part %T", v) - case *objectV2.GetObjectPartInit: - if headWas { - return nil, errWrongMessageSeq - } - - headWas = true - - obj := new(objectV2.Object) - - obj.SetObjectID(v.GetObjectID()) - obj.SetSignature(v.GetSignature()) - obj.SetHeader(v.GetHeader()) - - onceHeaderSending.Do(func() { - err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj)) - }) - if err != nil { - return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) - } - case *objectV2.GetObjectPartChunk: - if !headWas { - return nil, errWrongMessageSeq - } - - origChunk := v.GetChunk() - - chunk := chunkToSend(globalProgress, localProgress, origChunk) - if len(chunk) == 0 { - localProgress += len(origChunk) - continue - } - - if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil { - return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err) - } - - localProgress += len(origChunk) - globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) - return nil, object.NewSplitInfoError(si) - } - } - - return nil, nil + return forwarder.forward(stream.Context(), addr, c, pubkey) })) }