[#323] client: Refactor object.Search

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-08-23 21:56:55 +03:00 committed by fyrchik
parent 5d7650c3e7
commit 6a43accf96
3 changed files with 145 additions and 222 deletions

View file

@ -13,6 +13,7 @@ import (
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-api-go/v2/signature"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
@ -23,15 +24,9 @@ import (
// PrmObjectSearch groups parameters of ObjectSearch operation.
type PrmObjectSearch struct {
prmCommonMeta
meta v2session.RequestMetaHeader
local bool
sessionSet bool
session session.Object
bearerSet bool
bearer bearer.Token
key *ecdsa.PrivateKey
cnrSet bool
cnrID cid.ID
@ -41,7 +36,7 @@ type PrmObjectSearch struct {
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectSearch) MarkLocal() {
x.local = true
x.meta.SetTTL(1)
}
// WithinSession specifies session within which the search query must be executed.
@ -51,8 +46,9 @@ func (x *PrmObjectSearch) MarkLocal() {
//
// Must be signed.
func (x *PrmObjectSearch) WithinSession(t session.Object) {
x.session = t
x.sessionSet = true
var tokv2 v2session.Token
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
@ -61,8 +57,27 @@ func (x *PrmObjectSearch) WithinSession(t session.Object) {
//
// Must be signed.
func (x *PrmObjectSearch) WithBearerToken(t bearer.Token) {
x.bearer = t
x.bearerSet = true
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectSearch) WithXHeaders(hs ...string) {
if len(hs)%2 != 0 {
panic("slice of X-Headers with odd length")
}
writeXHeadersToMeta(hs, &x.meta)
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectSearch) UseKey(key ecdsa.PrivateKey) {
x.key = &key
}
// InContainer specifies the container in which to look for objects.
@ -87,24 +102,16 @@ type ResObjectSearch struct {
//
// Must be initialized using Client.ObjectSearch, any other usage is unsafe.
type ObjectListReader struct {
client *Client
cancelCtxStream context.CancelFunc
ctxCall contextCall
reqWritten bool
// initially bound to contextCall
bodyResp v2object.SearchResponseBody
err error
res ResObjectSearch
stream interface {
Read(resp *v2object.SearchResponse) error
}
tail []v2refs.ObjectID
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *ObjectListReader) UseKey(key ecdsa.PrivateKey) {
x.ctxCall.key = key
}
// Read reads another list of the object identifiers. Works similar to
// io.Reader.Read but copies oid.ID and returns success flag instead of error.
//
@ -116,58 +123,33 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) {
panic("empty buffer in ObjectListReader.ReadList")
}
if !x.reqWritten {
if !x.ctxCall.writeRequest() {
return 0, false
}
x.reqWritten = true
}
// read remaining tail
read := len(x.tail)
if read > len(buf) {
read = len(buf)
}
for i := 0; i < read; i++ {
_ = buf[i].ReadFromV2(x.tail[i])
}
read := copyIDBuffers(buf, x.tail)
x.tail = x.tail[read:]
if len(buf) == read {
return read, true
}
var ok bool
var ids []v2refs.ObjectID
var i, ln, rem int
for {
// receive next message
ok = x.ctxCall.readResponse()
if !ok {
var resp v2object.SearchResponse
x.err = x.stream.Read(&resp)
if x.err != nil {
return read, false
}
x.res.st, x.err = x.client.processResponse(&resp)
if x.err != nil || !apistatus.IsSuccessful(x.res.st) {
return read, false
}
// read new chunk of objects
ids = x.bodyResp.GetIDList()
ln = len(ids)
if ln == 0 {
ids := resp.GetBody().GetIDList()
if len(ids) == 0 {
// just skip empty lists since they are not prohibited by protocol
continue
}
if rem = len(buf) - read; ln > rem {
ln = rem
}
for i = 0; i < ln; i++ {
_ = buf[read+i].ReadFromV2(ids[i])
}
ln := copyIDBuffers(buf[read:], ids)
read += ln
if read == len(buf) {
@ -179,6 +161,14 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) {
}
}
func copyIDBuffers(dst []oid.ID, src []v2refs.ObjectID) int {
var i int
for ; i < len(dst) && i < len(src); i++ {
_ = dst[i].ReadFromV2(src[i])
}
return i
}
// Iterate iterates over the list of found object identifiers.
// f can return true to stop iteration earlier.
//
@ -219,11 +209,11 @@ func (x *ObjectListReader) Iterate(f func(oid.ID) bool) error {
func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
defer x.cancelCtxStream()
if x.ctxCall.err != nil && !errors.Is(x.ctxCall.err, io.EOF) {
return nil, x.ctxCall.err
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
}
return x.ctxCall.statusRes.(*ResObjectSearch), nil
return &x.res, nil
}
// ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol.
@ -243,75 +233,37 @@ func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*Ob
panic(panicMsgMissingContainer)
}
// form request body
var (
body v2object.SearchRequestBody
cidV2 v2refs.ContainerID
)
var cidV2 v2refs.ContainerID
prm.cnrID.WriteToV2(&cidV2)
var body v2object.SearchRequestBody
body.SetVersion(1)
body.SetContainerID(&cidV2)
body.SetFilters(prm.filters.ToV2())
// form meta header
var meta v2session.RequestMetaHeader
if prm.local {
meta.SetTTL(1)
}
if prm.bearerSet {
var v2token acl.BearerToken
prm.bearer.WriteToV2(&v2token)
meta.SetBearerToken(&v2token)
}
if prm.sessionSet {
var tokv2 v2session.Token
prm.session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2)
}
writeXHeadersToMeta(prm.prmCommonMeta.xHeaders, &meta)
// form request
var req v2object.SearchRequest
req.SetBody(&body)
req.SetMetaHeader(&meta)
// init reader
var (
r ObjectListReader
resp v2object.SearchResponse
stream *rpcapi.SearchResponseReader
)
var req v2object.SearchRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil {
key = &c.prm.key
}
err := signature.SignServiceMessage(key, &req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
var r ObjectListReader
ctx, r.cancelCtxStream = context.WithCancel(ctx)
resp.SetBody(&r.bodyResp)
// init call context
c.initCallContext(&r.ctxCall)
r.ctxCall.req = &req
r.ctxCall.statusRes = new(ResObjectSearch)
r.ctxCall.resp = &resp
r.ctxCall.wReq = func() error {
var err error
stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx))
if err != nil {
return fmt.Errorf("open stream: %w", err)
}
return nil
}
r.ctxCall.rResp = func() error {
return stream.Read(&resp)
r.stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("open stream: %w", err)
}
r.client = c
return &r, nil
}

View file

@ -1,12 +1,14 @@
package client
import (
"crypto/ecdsa"
"errors"
"fmt"
"io"
"testing"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-api-go/v2/object"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
signatureV2 "github.com/nspcc-dev/neofs-api-go/v2/signature"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@ -20,7 +22,7 @@ func TestObjectSearch(t *testing.T) {
ids[i] = oidtest.ID()
}
resp, setID := testListReaderResponse(t)
p, resp := testListReaderResponse(t)
buf := make([]oid.ID, 2)
checkRead := func(t *testing.T, expected []oid.ID) {
@ -34,38 +36,23 @@ func TestObjectSearch(t *testing.T) {
require.Panics(t, func() { resp.Read(nil) })
// both ID fetched
setID(ids[:3])
resp.stream = newSearchStream(p, nil, ids[:3])
checkRead(t, ids[:2])
// one ID cached, second fetched
setID(ids[3:6])
resp.stream = newSearchStream(p, nil, ids[3:6])
checkRead(t, ids[2:4])
// both ID cached
resp.ctxCall.resp = nil
resp.stream = nil // shouldn't be called, panic if so
checkRead(t, ids[4:6])
// both ID fetched in 2 requests, with empty one in the middle
var n int
resp.ctxCall.rResp = func() error {
switch n {
case 0:
setID(ids[6:7])
case 1:
setID(nil)
case 2:
setID(ids[7:8])
default:
t.FailNow()
}
n++
return nil
}
resp.stream = newSearchStream(p, nil, ids[6:7], nil, ids[7:8])
checkRead(t, ids[6:8])
// read from tail multiple times
resp.ctxCall.rResp = nil
setID(ids[8:11])
resp.stream = newSearchStream(p, nil, ids[8:11])
buf = buf[:1]
checkRead(t, ids[8:9])
checkRead(t, ids[9:10])
@ -73,15 +60,7 @@ func TestObjectSearch(t *testing.T) {
// handle EOF
buf = buf[:2]
n = 0
resp.ctxCall.rResp = func() error {
if n > 0 {
return io.EOF
}
n++
setID(ids[11:12])
return nil
}
resp.stream = newSearchStream(p, io.EOF, ids[11:12])
checkRead(t, ids[11:12])
}
@ -92,24 +71,9 @@ func TestObjectIterate(t *testing.T) {
}
t.Run("iterate all sequence", func(t *testing.T) {
resp, setID := testListReaderResponse(t)
p, resp := testListReaderResponse(t)
// Iterate over all sequence
var n int
resp.ctxCall.rResp = func() error {
switch n {
case 0:
setID(ids[0:2])
case 1:
setID(nil)
case 2:
setID(ids[2:3])
default:
return io.EOF
}
n++
return nil
}
resp.stream = newSearchStream(p, io.EOF, ids[0:2], nil, ids[2:3])
var actual []oid.ID
require.NoError(t, resp.Iterate(func(id oid.ID) bool {
@ -119,10 +83,10 @@ func TestObjectIterate(t *testing.T) {
require.Equal(t, ids[:3], actual)
})
t.Run("stop by return value", func(t *testing.T) {
resp, setID := testListReaderResponse(t)
p, resp := testListReaderResponse(t)
var actual []oid.ID
setID(ids)
resp.stream = &singleStreamResponder{key: p, idList: [][]oid.ID{ids}}
require.NoError(t, resp.Iterate(func(id oid.ID) bool {
actual = append(actual, id)
return len(actual) == 2
@ -130,22 +94,12 @@ func TestObjectIterate(t *testing.T) {
require.Equal(t, ids[:2], actual)
})
t.Run("stop after error", func(t *testing.T) {
resp, setID := testListReaderResponse(t)
p, resp := testListReaderResponse(t)
expectedErr := errors.New("test error")
var actual []oid.ID
var n int
resp.ctxCall.rResp = func() error {
switch n {
case 0:
setID(ids[:2])
default:
return expectedErr
}
n++
return nil
}
resp.stream = newSearchStream(p, expectedErr, ids[:2])
var actual []oid.ID
err := resp.Iterate(func(id oid.ID) bool {
actual = append(actual, id)
return false
@ -155,40 +109,56 @@ func TestObjectIterate(t *testing.T) {
})
}
func testListReaderResponse(t *testing.T) (*ObjectListReader, func(id []oid.ID) *object.SearchResponse) {
func testListReaderResponse(t *testing.T) (*ecdsa.PrivateKey, *ObjectListReader) {
p, err := keys.NewPrivateKey()
require.NoError(t, err)
obj := &ObjectListReader{
return &p.PrivateKey, &ObjectListReader{
cancelCtxStream: func() {},
ctxCall: contextCall{
closer: func() error { return nil },
result: func(v2 responseV2) {},
statusRes: new(ResObjectSearch),
},
reqWritten: true,
bodyResp: object.SearchResponseBody{},
tail: nil,
}
return obj, func(id []oid.ID) *object.SearchResponse {
resp := new(object.SearchResponse)
resp.SetBody(new(object.SearchResponseBody))
v2id := make([]refs.ObjectID, len(id))
var oidV2 refs.ObjectID
for i := range id {
id[i].WriteToV2(&oidV2)
v2id[i] = oidV2
}
resp.GetBody().SetIDList(v2id)
err := signatureV2.SignServiceMessage(&p.PrivateKey, resp)
if err != nil {
t.Fatalf("error: %v", err)
}
obj.ctxCall.resp = resp
obj.bodyResp = *resp.GetBody()
return resp
client: &Client{},
tail: nil,
}
}
func newSearchStream(key *ecdsa.PrivateKey, endError error, idList ...[]oid.ID) *singleStreamResponder {
return &singleStreamResponder{
key: key,
endError: endError,
idList: idList,
}
}
type singleStreamResponder struct {
key *ecdsa.PrivateKey
n int
endError error
idList [][]oid.ID
}
func (s *singleStreamResponder) Read(resp *v2object.SearchResponse) error {
if s.n >= len(s.idList) {
if s.endError != nil {
return s.endError
}
panic("unexpected call to `Read`")
}
var body v2object.SearchResponseBody
if s.idList[s.n] != nil {
ids := make([]refs.ObjectID, len(s.idList[s.n]))
for i := range s.idList[s.n] {
s.idList[s.n][i].WriteToV2(&ids[i])
}
body.SetIDList(ids)
}
resp.SetBody(&body)
err := signatureV2.SignServiceMessage(s.key, resp)
if err != nil {
panic(fmt.Errorf("error: %w", err))
}
s.n++
return nil
}

View file

@ -717,13 +717,14 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
res, err := c.client.ObjectSearchInit(ctx, cliPrm)
if err = c.handleError(nil, err); err != nil {
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
}
if prm.key != nil {
res.UseKey(*prm.key)
}
return ResObjectSearch{r: res}, nil
}