From e6b30aed36cb7562d70e02eacd7ea046746226e7 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 29 Apr 2021 11:54:40 +0300 Subject: [PATCH] [#431] object/search: Re-sign original requests during forwarding In previous implementation node's Object Search V2 service handler created a new request for each RPC. Now original requests are re-signed according to API specification. Logical handler abstracts from this version-dependent logic through `RequestForwarder` callback. Signed-off-by: Leonard Lyubich --- pkg/services/object/search/prm.go | 12 +++++ pkg/services/object/search/util.go | 4 +- pkg/services/object/search/v2/util.go | 70 +++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go index d81cf4fba..11144cc11 100644 --- a/pkg/services/object/search/prm.go +++ b/pkg/services/object/search/prm.go @@ -13,6 +13,8 @@ type Prm struct { common *util.CommonPrm client.SearchObjectParams + + forwarder RequestForwarder } // IDListWriter is an interface of target component @@ -21,6 +23,10 @@ type IDListWriter interface { WriteIDs([]*objectSDK.ID) error } +// RequestForwarder is a callback for forwarding of the +// original Search requests. +type RequestForwarder func(client.Client) ([]*objectSDK.ID, error) + // SetCommonParameters sets common parameters of the operation. func (p *Prm) SetCommonParameters(common *util.CommonPrm) { p.common = common @@ -30,3 +36,9 @@ func (p *Prm) SetCommonParameters(common *util.CommonPrm) { func (p *Prm) SetWriter(w IDListWriter) { p.writer = w } + +// SetRequestForwarder sets callback for forwarding +// of the original request. +func (p *Prm) SetRequestForwarder(f RequestForwarder) { + p.forwarder = f +} diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index bb25e8703..7a2073e3b 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -76,9 +76,7 @@ func (c *clientConstructorWrapper) get(addr string) (searchClient, error) { } func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) { - return c.client.SearchObject(exec.context(), - exec.remotePrm(), - exec.callOptions()...) + return exec.prm.forwarder(c.client) } func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) { diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 6e59921fc..31d82071a 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -1,13 +1,22 @@ package searchsvc import ( + "io" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/token" + rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-api-go/v2/signature" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/pkg/errors" ) func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) { @@ -32,6 +41,67 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre stream: stream, }) + if !commonPrm.LocalOnly() { + var onceResign sync.Once + + p.SetRequestForwarder(func(c client.Client) ([]*objectSDK.ID, error) { + var err error + + // once compose and resign forwarding request + onceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(meta.GetTTL() - 1) + // TODO: think how to set the other fields + metaHdr.SetOrigin(meta) + + req.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(key, req) + }) + + if err != nil { + return nil, err + } + + stream, err := rpc.SearchObjects(c.Raw(), req, rpcclient.WithContext(stream.Context())) + if err != nil { + return nil, err + } + + // code below is copy-pasted from c.SearchObjects implementation, + // perhaps it is worth highlighting the utility function in neofs-api-go + var ( + searchResult []*objectSDK.ID + resp = new(objectV2.SearchResponse) + ) + + for { + // receive message from server stream + err := stream.Read(resp) + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + return nil, errors.Wrap(err, "reading the response failed") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + chunk := resp.GetBody().GetIDList() + for i := range chunk { + searchResult = append(searchResult, objectSDK.NewIDFromV2(chunk[i])) + } + } + + return searchResult, nil + }) + } + body := req.GetBody() p.WithContainerID(container.NewIDFromV2(body.GetContainerID())) p.WithSearchFilters(objectSDK.NewSearchFiltersFromV2(body.GetFilters()))