[#208] searchsvc: Refactor request forwarding

Resolve funlen & gocognit linters for toPrm method.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-04-04 13:05:40 +03:00
parent 0b38419fbf
commit d85703a963
4 changed files with 113 additions and 87 deletions

View file

@ -1,6 +1,8 @@
package searchsvc package searchsvc
import ( import (
"context"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -29,7 +31,7 @@ type IDListWriter interface {
// RequestForwarder is a callback for forwarding of the // RequestForwarder is a callback for forwarding of the
// original Search requests. // original Search requests.
type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) ([]oid.ID, error) type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) ([]oid.ID, error)
// SetCommonParameters sets common parameters of the operation. // SetCommonParameters sets common parameters of the operation.
func (p *Prm) SetCommonParameters(common *util.CommonPrm) { func (p *Prm) SetCommonParameters(common *util.CommonPrm) {

View file

@ -80,7 +80,7 @@ func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, erro
func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) {
if exec.prm.forwarder != nil { if exec.prm.forwarder != nil {
return exec.prm.forwarder(info, c.client) return exec.prm.forwarder(ctx, info, c.client)
} }
var sessionInfo *util.SessionInfo var sessionInfo *util.SessionInfo

View file

@ -0,0 +1,99 @@
package searchsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type requestForwarder struct {
OnceResign *sync.Once
Request *objectV2.SearchRequest
Key *ecdsa.PrivateKey
}
func (f *requestForwarder) forwardRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) ([]oid.ID, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
var searchStream *rpc.SearchResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
searchStream, err = rpc.SearchObjects(cli, f.Request, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.SearchObjects implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
var (
searchResult []oid.ID
resp = new(objectV2.SearchResponse)
)
for {
// receive message from server stream
err := searchStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}
chunk := resp.GetBody().GetIDList()
var id oid.ID
for i := range chunk {
err = id.ReadFromV2(chunk[i])
if err != nil {
return nil, fmt.Errorf("invalid object ID: %w", err)
}
searchResult = append(searchResult, id)
}
}
return searchResult, nil
}

View file

@ -1,20 +1,15 @@
package searchsvc package searchsvc
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -22,7 +17,6 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
// nolint: funlen, gocognit
func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) { func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) {
body := req.GetBody() body := req.GetBody()
@ -38,8 +32,6 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
return nil, fmt.Errorf("invalid container ID: %w", err) return nil, fmt.Errorf("invalid container ID: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -53,85 +45,18 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
}) })
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once
key, err := s.keyStorage.GetKey(nil) key, err := s.keyStorage.GetKey(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) ([]oid.ID, error) { forwarder := &requestForwarder{
var err error OnceResign: &sync.Once{},
Request: req,
Key: key,
}
// once compose and resign forwarding request p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequest))
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
var searchStream *rpc.SearchResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
searchStream, err = rpc.SearchObjects(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.SearchObjects implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
var (
searchResult []oid.ID
resp = new(objectV2.SearchResponse)
)
for {
// receive message from server stream
err := searchStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}
chunk := resp.GetBody().GetIDList()
var id oid.ID
for i := range chunk {
err = id.ReadFromV2(chunk[i])
if err != nil {
return nil, fmt.Errorf("invalid object ID: %w", err)
}
searchResult = append(searchResult, id)
}
}
return searchResult, nil
}))
} }
p.WithContainerID(id) p.WithContainerID(id)
@ -140,8 +65,8 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
return p, nil return p, nil
} }
func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) ([]oid.ID, error)) searchsvc.RequestForwarder { func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) ([]oid.ID, error)) searchsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) ([]oid.ID, error) { return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) ([]oid.ID, error) {
var ( var (
firstErr error firstErr error
res []oid.ID res []oid.ID
@ -162,7 +87,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
// would be nice to log otherwise // would be nice to log otherwise
}() }()
res, err = f(addr, c, key) res, err = f(ctx, addr, c, key)
return return
}) })