forked from TrueCloudLab/frostfs-node
[#431] object/search: Re-sign original requests during forwarding
In previous implementation node's Object Search V2 service handler created a new request for each RPC. Now original requests are re-signed according to API specification. Logical handler abstracts from this version-dependent logic through `RequestForwarder` callback. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
2877a343c3
commit
e6b30aed36
3 changed files with 83 additions and 3 deletions
|
@ -13,6 +13,8 @@ type Prm struct {
|
||||||
common *util.CommonPrm
|
common *util.CommonPrm
|
||||||
|
|
||||||
client.SearchObjectParams
|
client.SearchObjectParams
|
||||||
|
|
||||||
|
forwarder RequestForwarder
|
||||||
}
|
}
|
||||||
|
|
||||||
// IDListWriter is an interface of target component
|
// IDListWriter is an interface of target component
|
||||||
|
@ -21,6 +23,10 @@ type IDListWriter interface {
|
||||||
WriteIDs([]*objectSDK.ID) error
|
WriteIDs([]*objectSDK.ID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RequestForwarder is a callback for forwarding of the
|
||||||
|
// original Search requests.
|
||||||
|
type RequestForwarder func(client.Client) ([]*objectSDK.ID, error)
|
||||||
|
|
||||||
// SetCommonParameters sets common parameters of the operation.
|
// SetCommonParameters sets common parameters of the operation.
|
||||||
func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
|
func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
|
||||||
p.common = common
|
p.common = common
|
||||||
|
@ -30,3 +36,9 @@ func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
|
||||||
func (p *Prm) SetWriter(w IDListWriter) {
|
func (p *Prm) SetWriter(w IDListWriter) {
|
||||||
p.writer = w
|
p.writer = w
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetRequestForwarder sets callback for forwarding
|
||||||
|
// of the original request.
|
||||||
|
func (p *Prm) SetRequestForwarder(f RequestForwarder) {
|
||||||
|
p.forwarder = f
|
||||||
|
}
|
||||||
|
|
|
@ -76,9 +76,7 @@ func (c *clientConstructorWrapper) get(addr string) (searchClient, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) {
|
func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) {
|
||||||
return c.client.SearchObject(exec.context(),
|
return exec.prm.forwarder(c.client)
|
||||||
exec.remotePrm(),
|
|
||||||
exec.callOptions()...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) {
|
func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) {
|
||||||
|
|
|
@ -1,13 +1,22 @@
|
||||||
package searchsvc
|
package searchsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
|
rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client"
|
||||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) {
|
func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) {
|
||||||
|
@ -32,6 +41,67 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
|
||||||
stream: stream,
|
stream: stream,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if !commonPrm.LocalOnly() {
|
||||||
|
var onceResign sync.Once
|
||||||
|
|
||||||
|
p.SetRequestForwarder(func(c client.Client) ([]*objectSDK.ID, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// once compose and resign forwarding request
|
||||||
|
onceResign.Do(func() {
|
||||||
|
// compose meta header of the local server
|
||||||
|
metaHdr := new(session.RequestMetaHeader)
|
||||||
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||||
|
// TODO: think how to set the other fields
|
||||||
|
metaHdr.SetOrigin(meta)
|
||||||
|
|
||||||
|
req.SetMetaHeader(metaHdr)
|
||||||
|
|
||||||
|
err = signature.SignServiceMessage(key, req)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := rpc.SearchObjects(c.Raw(), req, rpcclient.WithContext(stream.Context()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// code below is copy-pasted from c.SearchObjects implementation,
|
||||||
|
// perhaps it is worth highlighting the utility function in neofs-api-go
|
||||||
|
var (
|
||||||
|
searchResult []*objectSDK.ID
|
||||||
|
resp = new(objectV2.SearchResponse)
|
||||||
|
)
|
||||||
|
|
||||||
|
for {
|
||||||
|
// receive message from server stream
|
||||||
|
err := stream.Read(resp)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(errors.Cause(err), io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, errors.Wrap(err, "reading the response failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify response structure
|
||||||
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk := resp.GetBody().GetIDList()
|
||||||
|
for i := range chunk {
|
||||||
|
searchResult = append(searchResult, objectSDK.NewIDFromV2(chunk[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return searchResult, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
body := req.GetBody()
|
body := req.GetBody()
|
||||||
p.WithContainerID(container.NewIDFromV2(body.GetContainerID()))
|
p.WithContainerID(container.NewIDFromV2(body.GetContainerID()))
|
||||||
p.WithSearchFilters(objectSDK.NewSearchFiltersFromV2(body.GetFilters()))
|
p.WithSearchFilters(objectSDK.NewSearchFiltersFromV2(body.GetFilters()))
|
||||||
|
|
Loading…
Reference in a new issue