From 6a43accf9637177aeb5c87e77ac55817afa15ec1 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 23 Aug 2022 21:56:55 +0300 Subject: [PATCH] [#323] client: Refactor `object.Search` Signed-off-by: Evgenii Stratonikov --- client/object_search.go | 202 +++++++++++++---------------------- client/object_search_test.go | 158 +++++++++++---------------- pool/pool.go | 7 +- 3 files changed, 145 insertions(+), 222 deletions(-) diff --git a/client/object_search.go b/client/object_search.go index 5931433..1ab9396 100644 --- a/client/object_search.go +++ b/client/object_search.go @@ -13,6 +13,7 @@ import ( rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-sdk-go/bearer" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -23,15 +24,9 @@ import ( // PrmObjectSearch groups parameters of ObjectSearch operation. type PrmObjectSearch struct { - prmCommonMeta + meta v2session.RequestMetaHeader - local bool - - sessionSet bool - session session.Object - - bearerSet bool - bearer bearer.Token + key *ecdsa.PrivateKey cnrSet bool cnrID cid.ID @@ -41,7 +36,7 @@ type PrmObjectSearch struct { // MarkLocal tells the server to execute the operation locally. func (x *PrmObjectSearch) MarkLocal() { - x.local = true + x.meta.SetTTL(1) } // WithinSession specifies session within which the search query must be executed. @@ -51,8 +46,9 @@ func (x *PrmObjectSearch) MarkLocal() { // // Must be signed. func (x *PrmObjectSearch) WithinSession(t session.Object) { - x.session = t - x.sessionSet = true + var tokv2 v2session.Token + t.WriteToV2(&tokv2) + x.meta.SetSessionToken(&tokv2) } // WithBearerToken attaches bearer token to be used for the operation. @@ -61,8 +57,27 @@ func (x *PrmObjectSearch) WithinSession(t session.Object) { // // Must be signed. func (x *PrmObjectSearch) WithBearerToken(t bearer.Token) { - x.bearer = t - x.bearerSet = true + var v2token acl.BearerToken + t.WriteToV2(&v2token) + x.meta.SetBearerToken(&v2token) +} + +// WithXHeaders specifies list of extended headers (string key-value pairs) +// to be attached to the request. Must have an even length. +// +// Slice must not be mutated until the operation completes. +func (x *PrmObjectSearch) WithXHeaders(hs ...string) { + if len(hs)%2 != 0 { + panic("slice of X-Headers with odd length") + } + + writeXHeadersToMeta(hs, &x.meta) +} + +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *PrmObjectSearch) UseKey(key ecdsa.PrivateKey) { + x.key = &key } // InContainer specifies the container in which to look for objects. @@ -87,24 +102,16 @@ type ResObjectSearch struct { // // Must be initialized using Client.ObjectSearch, any other usage is unsafe. type ObjectListReader struct { + client *Client cancelCtxStream context.CancelFunc - - ctxCall contextCall - - reqWritten bool - - // initially bound to contextCall - bodyResp v2object.SearchResponseBody - + err error + res ResObjectSearch + stream interface { + Read(resp *v2object.SearchResponse) error + } tail []v2refs.ObjectID } -// UseKey specifies private key to sign the requests. -// If key is not provided, then Client default key is used. -func (x *ObjectListReader) UseKey(key ecdsa.PrivateKey) { - x.ctxCall.key = key -} - // Read reads another list of the object identifiers. Works similar to // io.Reader.Read but copies oid.ID and returns success flag instead of error. // @@ -116,58 +123,33 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) { panic("empty buffer in ObjectListReader.ReadList") } - if !x.reqWritten { - if !x.ctxCall.writeRequest() { - return 0, false - } - - x.reqWritten = true - } - - // read remaining tail - read := len(x.tail) - if read > len(buf) { - read = len(buf) - } - - for i := 0; i < read; i++ { - _ = buf[i].ReadFromV2(x.tail[i]) - } - + read := copyIDBuffers(buf, x.tail) x.tail = x.tail[read:] if len(buf) == read { return read, true } - var ok bool - var ids []v2refs.ObjectID - var i, ln, rem int - for { - // receive next message - ok = x.ctxCall.readResponse() - if !ok { + var resp v2object.SearchResponse + x.err = x.stream.Read(&resp) + if x.err != nil { + return read, false + } + + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { return read, false } // read new chunk of objects - ids = x.bodyResp.GetIDList() - - ln = len(ids) - if ln == 0 { + ids := resp.GetBody().GetIDList() + if len(ids) == 0 { // just skip empty lists since they are not prohibited by protocol continue } - if rem = len(buf) - read; ln > rem { - ln = rem - } - - for i = 0; i < ln; i++ { - _ = buf[read+i].ReadFromV2(ids[i]) - } - + ln := copyIDBuffers(buf[read:], ids) read += ln if read == len(buf) { @@ -179,6 +161,14 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) { } } +func copyIDBuffers(dst []oid.ID, src []v2refs.ObjectID) int { + var i int + for ; i < len(dst) && i < len(src); i++ { + _ = dst[i].ReadFromV2(src[i]) + } + return i +} + // Iterate iterates over the list of found object identifiers. // f can return true to stop iteration earlier. // @@ -219,11 +209,11 @@ func (x *ObjectListReader) Iterate(f func(oid.ID) bool) error { func (x *ObjectListReader) Close() (*ResObjectSearch, error) { defer x.cancelCtxStream() - if x.ctxCall.err != nil && !errors.Is(x.ctxCall.err, io.EOF) { - return nil, x.ctxCall.err + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err } - return x.ctxCall.statusRes.(*ResObjectSearch), nil + return &x.res, nil } // ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol. @@ -243,75 +233,37 @@ func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*Ob panic(panicMsgMissingContainer) } - // form request body - var ( - body v2object.SearchRequestBody - cidV2 v2refs.ContainerID - ) - + var cidV2 v2refs.ContainerID prm.cnrID.WriteToV2(&cidV2) + var body v2object.SearchRequestBody body.SetVersion(1) body.SetContainerID(&cidV2) body.SetFilters(prm.filters.ToV2()) - // form meta header - var meta v2session.RequestMetaHeader - - if prm.local { - meta.SetTTL(1) - } - - if prm.bearerSet { - var v2token acl.BearerToken - prm.bearer.WriteToV2(&v2token) - meta.SetBearerToken(&v2token) - } - - if prm.sessionSet { - var tokv2 v2session.Token - prm.session.WriteToV2(&tokv2) - - meta.SetSessionToken(&tokv2) - } - - writeXHeadersToMeta(prm.prmCommonMeta.xHeaders, &meta) - - // form request - var req v2object.SearchRequest - - req.SetBody(&body) - req.SetMetaHeader(&meta) - // init reader - var ( - r ObjectListReader - resp v2object.SearchResponse - stream *rpcapi.SearchResponseReader - ) + var req v2object.SearchRequest + req.SetBody(&body) + c.prepareRequest(&req, &prm.meta) + key := prm.key + if key == nil { + key = &c.prm.key + } + + err := signature.SignServiceMessage(key, &req) + if err != nil { + return nil, fmt.Errorf("sign request: %w", err) + } + + var r ObjectListReader ctx, r.cancelCtxStream = context.WithCancel(ctx) - resp.SetBody(&r.bodyResp) - - // init call context - c.initCallContext(&r.ctxCall) - r.ctxCall.req = &req - r.ctxCall.statusRes = new(ResObjectSearch) - r.ctxCall.resp = &resp - r.ctxCall.wReq = func() error { - var err error - - stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx)) - if err != nil { - return fmt.Errorf("open stream: %w", err) - } - - return nil - } - r.ctxCall.rResp = func() error { - return stream.Read(&resp) + r.stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) } + r.client = c return &r, nil } diff --git a/client/object_search_test.go b/client/object_search_test.go index 6fb885c..862924c 100644 --- a/client/object_search_test.go +++ b/client/object_search_test.go @@ -1,12 +1,14 @@ package client import ( + "crypto/ecdsa" "errors" + "fmt" "io" "testing" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neofs-api-go/v2/object" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/refs" signatureV2 "github.com/nspcc-dev/neofs-api-go/v2/signature" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -20,7 +22,7 @@ func TestObjectSearch(t *testing.T) { ids[i] = oidtest.ID() } - resp, setID := testListReaderResponse(t) + p, resp := testListReaderResponse(t) buf := make([]oid.ID, 2) checkRead := func(t *testing.T, expected []oid.ID) { @@ -34,38 +36,23 @@ func TestObjectSearch(t *testing.T) { require.Panics(t, func() { resp.Read(nil) }) // both ID fetched - setID(ids[:3]) + resp.stream = newSearchStream(p, nil, ids[:3]) checkRead(t, ids[:2]) // one ID cached, second fetched - setID(ids[3:6]) + resp.stream = newSearchStream(p, nil, ids[3:6]) checkRead(t, ids[2:4]) // both ID cached - resp.ctxCall.resp = nil + resp.stream = nil // shouldn't be called, panic if so checkRead(t, ids[4:6]) // both ID fetched in 2 requests, with empty one in the middle - var n int - resp.ctxCall.rResp = func() error { - switch n { - case 0: - setID(ids[6:7]) - case 1: - setID(nil) - case 2: - setID(ids[7:8]) - default: - t.FailNow() - } - n++ - return nil - } + resp.stream = newSearchStream(p, nil, ids[6:7], nil, ids[7:8]) checkRead(t, ids[6:8]) // read from tail multiple times - resp.ctxCall.rResp = nil - setID(ids[8:11]) + resp.stream = newSearchStream(p, nil, ids[8:11]) buf = buf[:1] checkRead(t, ids[8:9]) checkRead(t, ids[9:10]) @@ -73,15 +60,7 @@ func TestObjectSearch(t *testing.T) { // handle EOF buf = buf[:2] - n = 0 - resp.ctxCall.rResp = func() error { - if n > 0 { - return io.EOF - } - n++ - setID(ids[11:12]) - return nil - } + resp.stream = newSearchStream(p, io.EOF, ids[11:12]) checkRead(t, ids[11:12]) } @@ -92,24 +71,9 @@ func TestObjectIterate(t *testing.T) { } t.Run("iterate all sequence", func(t *testing.T) { - resp, setID := testListReaderResponse(t) + p, resp := testListReaderResponse(t) - // Iterate over all sequence - var n int - resp.ctxCall.rResp = func() error { - switch n { - case 0: - setID(ids[0:2]) - case 1: - setID(nil) - case 2: - setID(ids[2:3]) - default: - return io.EOF - } - n++ - return nil - } + resp.stream = newSearchStream(p, io.EOF, ids[0:2], nil, ids[2:3]) var actual []oid.ID require.NoError(t, resp.Iterate(func(id oid.ID) bool { @@ -119,10 +83,10 @@ func TestObjectIterate(t *testing.T) { require.Equal(t, ids[:3], actual) }) t.Run("stop by return value", func(t *testing.T) { - resp, setID := testListReaderResponse(t) + p, resp := testListReaderResponse(t) var actual []oid.ID - setID(ids) + resp.stream = &singleStreamResponder{key: p, idList: [][]oid.ID{ids}} require.NoError(t, resp.Iterate(func(id oid.ID) bool { actual = append(actual, id) return len(actual) == 2 @@ -130,22 +94,12 @@ func TestObjectIterate(t *testing.T) { require.Equal(t, ids[:2], actual) }) t.Run("stop after error", func(t *testing.T) { - resp, setID := testListReaderResponse(t) + p, resp := testListReaderResponse(t) expectedErr := errors.New("test error") - var actual []oid.ID - var n int - resp.ctxCall.rResp = func() error { - switch n { - case 0: - setID(ids[:2]) - default: - return expectedErr - } - n++ - return nil - } + resp.stream = newSearchStream(p, expectedErr, ids[:2]) + var actual []oid.ID err := resp.Iterate(func(id oid.ID) bool { actual = append(actual, id) return false @@ -155,40 +109,56 @@ func TestObjectIterate(t *testing.T) { }) } -func testListReaderResponse(t *testing.T) (*ObjectListReader, func(id []oid.ID) *object.SearchResponse) { +func testListReaderResponse(t *testing.T) (*ecdsa.PrivateKey, *ObjectListReader) { p, err := keys.NewPrivateKey() require.NoError(t, err) - obj := &ObjectListReader{ + return &p.PrivateKey, &ObjectListReader{ cancelCtxStream: func() {}, - ctxCall: contextCall{ - closer: func() error { return nil }, - result: func(v2 responseV2) {}, - statusRes: new(ResObjectSearch), - }, - reqWritten: true, - bodyResp: object.SearchResponseBody{}, - tail: nil, - } - - return obj, func(id []oid.ID) *object.SearchResponse { - resp := new(object.SearchResponse) - resp.SetBody(new(object.SearchResponseBody)) - - v2id := make([]refs.ObjectID, len(id)) - var oidV2 refs.ObjectID - - for i := range id { - id[i].WriteToV2(&oidV2) - v2id[i] = oidV2 - } - resp.GetBody().SetIDList(v2id) - err := signatureV2.SignServiceMessage(&p.PrivateKey, resp) - if err != nil { - t.Fatalf("error: %v", err) - } - obj.ctxCall.resp = resp - obj.bodyResp = *resp.GetBody() - return resp + client: &Client{}, + tail: nil, } } + +func newSearchStream(key *ecdsa.PrivateKey, endError error, idList ...[]oid.ID) *singleStreamResponder { + return &singleStreamResponder{ + key: key, + endError: endError, + idList: idList, + } +} + +type singleStreamResponder struct { + key *ecdsa.PrivateKey + n int + endError error + idList [][]oid.ID +} + +func (s *singleStreamResponder) Read(resp *v2object.SearchResponse) error { + if s.n >= len(s.idList) { + if s.endError != nil { + return s.endError + } + panic("unexpected call to `Read`") + } + + var body v2object.SearchResponseBody + + if s.idList[s.n] != nil { + ids := make([]refs.ObjectID, len(s.idList[s.n])) + for i := range s.idList[s.n] { + s.idList[s.n][i].WriteToV2(&ids[i]) + } + body.SetIDList(ids) + } + resp.SetBody(&body) + + err := signatureV2.SignServiceMessage(s.key, resp) + if err != nil { + panic(fmt.Errorf("error: %w", err)) + } + + s.n++ + return nil +} diff --git a/pool/pool.go b/pool/pool.go index b5099ac..cde5fbf 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -717,13 +717,14 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) ( cliPrm.WithBearerToken(*prm.btoken) } + if prm.key != nil { + cliPrm.UseKey(*prm.key) + } + res, err := c.client.ObjectSearchInit(ctx, cliPrm) if err = c.handleError(nil, err); err != nil { return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err) } - if prm.key != nil { - res.UseKey(*prm.key) - } return ResObjectSearch{r: res}, nil }