diff --git a/pkg/client/object.go b/pkg/client/object.go index 27507c22..f7f95a03 100644 --- a/pkg/client/object.go +++ b/pkg/client/object.go @@ -8,6 +8,7 @@ import ( "fmt" "io" + "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" signer "github.com/nspcc-dev/neofs-api-go/util/signature" "github.com/nspcc-dev/neofs-api-go/v2/client" @@ -62,6 +63,12 @@ type RangeChecksumParams struct { salt []byte } +type SearchObjectParams struct { + cid *container.ID + + filters object.SearchFilters +} + type putObjectV2Writer struct { key *ecdsa.PrivateKey @@ -84,6 +91,8 @@ const chunkSize = 3 * (1 << 20) const TZSize = 64 +const searchQueryVersion uint32 = 1 + func rangesToV2(rs []*Range) []*v2object.Range { r2 := make([]*v2object.Range, 0, len(rs)) @@ -885,6 +894,110 @@ func (c *Client) objectPayloadRangeHashV2(ctx context.Context, p *RangeChecksumP return res, nil } +func (p *SearchObjectParams) WithContainerID(v *container.ID) *SearchObjectParams { + if p != nil { + p.cid = v + } + + return p +} + +func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams { + if p != nil { + p.filters = v + } + + return p +} + +func (c *Client) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) { + // check remote node version + switch c.remoteNode.Version.GetMajor() { + case 2: + return c.searchObjectV2(ctx, p, opts...) + default: + return nil, unsupportedProtocolErr + } +} + +func (c *Client) searchObjectV2(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) { + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return nil, errors.Wrap(err, "could not create Object V2 client") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.SearchRequest) + + // initialize request body + body := new(v2object.SearchRequestBody) + req.SetBody(body) + + v2Addr := new(v2refs.Address) + v2Addr.SetContainerID(p.cid.ToV2()) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: v2Addr, + verb: v2session.ObjectVerbSearch, + }); err != nil { + return nil, errors.Wrap(err, "could not sign session token") + } + req.SetMetaHeader(meta) + + // fill body fields + body.SetContainerID(v2Addr.GetContainerID()) + body.SetVersion(searchQueryVersion) + body.SetFilters(p.filters.ToV2()) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return nil, errors.Wrapf(err, "could not sign %T", req) + } + + // create search stream + stream, err := cli.Search(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not create search stream") + } + + var searchResult []*object.ID + + for { + // receive message from server stream + resp, err := stream.Recv() + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + return nil, errors.Wrap(err, "could not receive search response") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + chunk := resp.GetBody().GetIDList() + for i := range chunk { + searchResult = append(searchResult, object.NewIDFromV2(chunk[i])) + } + } + + return searchResult, nil +} + func v2ObjectClient(proto TransportProtocol, opts *clientOptions) (*v2object.Client, error) { switch proto { case GRPC: