diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go index c80257bd..da46dfeb 100644 --- a/pkg/services/object/search/prm.go +++ b/pkg/services/object/search/prm.go @@ -1,6 +1,8 @@ package searchsvc import ( + "context" + coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -29,7 +31,7 @@ type IDListWriter interface { // RequestForwarder is a callback for forwarding of the // original Search requests. -type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) ([]oid.ID, error) +type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) ([]oid.ID, error) // SetCommonParameters sets common parameters of the operation. func (p *Prm) SetCommonParameters(common *util.CommonPrm) { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 610dd77f..c12ed2c9 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -80,7 +80,7 @@ func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, erro func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { if exec.prm.forwarder != nil { - return exec.prm.forwarder(info, c.client) + return exec.prm.forwarder(ctx, info, c.client) } var sessionInfo *util.SessionInfo diff --git a/pkg/services/object/search/v2/request_forwarder.go b/pkg/services/object/search/v2/request_forwarder.go new file mode 100644 index 00000000..8023f2f0 --- /dev/null +++ b/pkg/services/object/search/v2/request_forwarder.go @@ -0,0 +1,99 @@ +package searchsvc + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + "sync" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type requestForwarder struct { + OnceResign *sync.Once + Request *objectV2.SearchRequest + Key *ecdsa.PrivateKey +} + +func (f *requestForwarder) forwardRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) ([]oid.ID, error) { + var err error + + // once compose and resign forwarding request + f.OnceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(f.Request.GetMetaHeader()) + + f.Request.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(f.Key, f.Request) + }) + + if err != nil { + return nil, err + } + + var searchStream *rpc.SearchResponseReader + err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { + searchStream, err = rpc.SearchObjects(cli, f.Request, rpcclient.WithContext(ctx)) + return err + }) + if err != nil { + return nil, err + } + + // code below is copy-pasted from c.SearchObjects implementation, + // perhaps it is worth highlighting the utility function in frostfs-api-go + var ( + searchResult []oid.ID + resp = new(objectV2.SearchResponse) + ) + + for { + // receive message from server stream + err := searchStream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("reading the response failed: %w", err) + } + + // verify response key + if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("could not verify %T: %w", resp, err) + } + + chunk := resp.GetBody().GetIDList() + var id oid.ID + + for i := range chunk { + err = id.ReadFromV2(chunk[i]) + if err != nil { + return nil, fmt.Errorf("invalid object ID: %w", err) + } + + searchResult = append(searchResult, id) + } + } + + return searchResult, nil +} diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 2bde6b2f..12158a82 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -1,20 +1,15 @@ package searchsvc import ( + "context" "errors" "fmt" - "io" "sync" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" - rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -22,7 +17,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -// nolint: funlen, gocognit func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) { body := req.GetBody() @@ -38,8 +32,6 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, fmt.Errorf("invalid container ID: %w", err) } - meta := req.GetMetaHeader() - commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return nil, err @@ -53,85 +45,18 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre }) if !commonPrm.LocalOnly() { - var onceResign sync.Once - key, err := s.keyStorage.GetKey(nil) if err != nil { return nil, err } - p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) ([]oid.ID, error) { - var err error + forwarder := &requestForwarder{ + OnceResign: &sync.Once{}, + Request: req, + Key: key, + } - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - var searchStream *rpc.SearchResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - searchStream, err = rpc.SearchObjects(cli, req, rpcclient.WithContext(stream.Context())) - return err - }) - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.SearchObjects implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - var ( - searchResult []oid.ID - resp = new(objectV2.SearchResponse) - ) - - for { - // receive message from server stream - err := searchStream.Read(resp) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, fmt.Errorf("could not verify %T: %w", resp, err) - } - - chunk := resp.GetBody().GetIDList() - var id oid.ID - - for i := range chunk { - err = id.ReadFromV2(chunk[i]) - if err != nil { - return nil, fmt.Errorf("invalid object ID: %w", err) - } - - searchResult = append(searchResult, id) - } - } - - return searchResult, nil - })) + p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequest)) } p.WithContainerID(id) @@ -140,8 +65,8 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return p, nil } -func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) ([]oid.ID, error)) searchsvc.RequestForwarder { - return func(info client.NodeInfo, c client.MultiAddressClient) ([]oid.ID, error) { +func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) ([]oid.ID, error)) searchsvc.RequestForwarder { + return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) ([]oid.ID, error) { var ( firstErr error res []oid.ID @@ -162,7 +87,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli // would be nice to log otherwise }() - res, err = f(addr, c, key) + res, err = f(ctx, addr, c, key) return })