[#148] sdk/client: Support object search rpc

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-09-17 13:12:48 +03:00 committed by Stanislav Bogatyrev
parent 549fcad76e
commit 73220620c5

View file

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
signer "github.com/nspcc-dev/neofs-api-go/util/signature"
"github.com/nspcc-dev/neofs-api-go/v2/client"
@ -62,6 +63,12 @@ type RangeChecksumParams struct {
salt []byte
}
type SearchObjectParams struct {
cid *container.ID
filters object.SearchFilters
}
type putObjectV2Writer struct {
key *ecdsa.PrivateKey
@ -84,6 +91,8 @@ const chunkSize = 3 * (1 << 20)
const TZSize = 64
const searchQueryVersion uint32 = 1
func rangesToV2(rs []*Range) []*v2object.Range {
r2 := make([]*v2object.Range, 0, len(rs))
@ -885,6 +894,110 @@ func (c *Client) objectPayloadRangeHashV2(ctx context.Context, p *RangeChecksumP
return res, nil
}
func (p *SearchObjectParams) WithContainerID(v *container.ID) *SearchObjectParams {
if p != nil {
p.cid = v
}
return p
}
func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams {
if p != nil {
p.filters = v
}
return p
}
func (c *Client) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
// check remote node version
switch c.remoteNode.Version.GetMajor() {
case 2:
return c.searchObjectV2(ctx, p, opts...)
default:
return nil, unsupportedProtocolErr
}
}
func (c *Client) searchObjectV2(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
// create V2 Object client
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
if err != nil {
return nil, errors.Wrap(err, "could not create Object V2 client")
}
callOpts := defaultCallOptions()
for i := range opts {
if opts[i] != nil {
opts[i].apply(&callOpts)
}
}
// create request
req := new(v2object.SearchRequest)
// initialize request body
body := new(v2object.SearchRequestBody)
req.SetBody(body)
v2Addr := new(v2refs.Address)
v2Addr.SetContainerID(p.cid.ToV2())
// set meta header
meta := v2MetaHeaderFromOpts(callOpts)
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
addr: v2Addr,
verb: v2session.ObjectVerbSearch,
}); err != nil {
return nil, errors.Wrap(err, "could not sign session token")
}
req.SetMetaHeader(meta)
// fill body fields
body.SetContainerID(v2Addr.GetContainerID())
body.SetVersion(searchQueryVersion)
body.SetFilters(p.filters.ToV2())
// sign the request
if err := signature.SignServiceMessage(c.key, req); err != nil {
return nil, errors.Wrapf(err, "could not sign %T", req)
}
// create search stream
stream, err := cli.Search(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "could not create search stream")
}
var searchResult []*object.ID
for {
// receive message from server stream
resp, err := stream.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
break
}
return nil, errors.Wrap(err, "could not receive search response")
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, errors.Wrapf(err, "could not verify %T", resp)
}
chunk := resp.GetBody().GetIDList()
for i := range chunk {
searchResult = append(searchResult, object.NewIDFromV2(chunk[i]))
}
}
return searchResult, nil
}
func v2ObjectClient(proto TransportProtocol, opts *clientOptions) (*v2object.Client, error) {
switch proto {
case GRPC: