Refactor get service #193

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT-3610_getsvc into master 2023-04-05 14:38:49 +00:00
2 changed files with 180 additions and 129 deletions
Showing only changes of commit 7f7d7555f1 - Show all commits

View file

@ -0,0 +1,171 @@
package getsvc
import (
"context"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRequestForwarder struct {
OnceResign *sync.Once
OnceHeaderSending *sync.Once
GlobalProgress int
KeyStorage *util.KeyStorage
Request *objectV2.GetRequest
Stream *streamObjectWriter
}
func (f *getRequestForwarder) forward(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {

so not used second return value?

so not used second return value?

forwardRequestToNode is used as an argument for func groupAddressRequestForwarder(...) function. groupAddressRequestForwarder requires two values to be returned. It is possible to use a lambda function, but I don't like it.

Do you think that with a lambda and one return value it will be easier to understand?

```forwardRequestToNode``` is used as an argument for ```func groupAddressRequestForwarder(...)``` function. ```groupAddressRequestForwarder``` requires two values to be returned. It is possible to use a lambda function, but I don't like it. Do you think that with a lambda and one return value it will be easier to understand?

Do you think that with a lambda and one return value it will be easier to understand?

no, do not like lambdas usually as more complex thing for a reader.

is used as an argument for func groupAddressRequestForwarder(...) function

originally, comment was about that: seems like anything that is ...forwarder... should not return objects. if groupAddressRequestForwarder is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that

> Do you think that with a lambda and one return value it will be easier to understand? no, do not like lambdas usually as more complex thing for a reader. > is used as an argument for func groupAddressRequestForwarder(...) function originally, comment was about that: seems like anything that is `...forwarder...` should not return objects. if `groupAddressRequestForwarder` is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that

HEAD request requires return result from forwarder

HEAD request requires return result from forwarder
key, err := f.KeyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, f.Request)
})
if err != nil {
return nil, err
}
getStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, c, getStream, pubkey)
}
carpawell marked this conversation as resolved Outdated

can we just return f.readStream err if no details are added? (personally, i would add some details via fmt.Errorf)

related to more than just that line

can we just return `f.readStream` err if no details are added? (personally, i would add some details via `fmt.Errorf`) related to more than just that line

Fixed.

Fixed.
func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error {
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
var err error
f.OnceHeaderSending.Do(func() {
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
})
if err != nil {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
return nil
}
func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) {
var getStream *rpc.GetResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
return getStream, nil
}
func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error {
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return errWrongMessageSeq
}
headWas = true
if err := f.writeHeader(ctx, v); err != nil {
return err
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -32,7 +32,6 @@ import (
var errWrongMessageSeq = errors.New("incorrect message sequence")
// nolint: funlen, gocognit
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
body := req.GetBody()
@ -48,8 +47,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
return nil, fmt.Errorf("invalid object address: %w", err)
}
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
@ -65,134 +62,17 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
p.SetObjectWriter(streamWrapper)
if !commonPrm.LocalOnly() {
var onceResign sync.Once
var onceHeaderSending sync.Once
var globalProgress int
forwarder := &getRequestForwarder{
OnceResign: &sync.Once{},
OnceHeaderSending: &sync.Once{},
GlobalProgress: 0,
KeyStorage: s.keyStorage,
Request: req,
Stream: streamWrapper,
}
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObject implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var getStream *rpc.GetResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return nil, io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return nil, fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return nil, errWrongMessageSeq
}
headWas = true
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
onceHeaderSending.Do(func() {
err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj))
})
if err != nil {
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return nil, errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
return forwarder.forward(stream.Context(), addr, c, pubkey)
}))
}