From 07817fb4032db26e32366a2e06f1e222aa0f9740 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 25 Feb 2022 13:20:15 +0300 Subject: [PATCH] [#148] client: Allow to iterate over search results Signed-off-by: Evgenii Stratonikov --- client/object_search.go | 25 +++++ client/object_search_test.go | 191 +++++++++++++++++++++++++++++++++++ pool/pool.go | 4 + 3 files changed, 220 insertions(+) create mode 100644 client/object_search_test.go diff --git a/client/object_search.go b/client/object_search.go index cd79597..c0750b1 100644 --- a/client/object_search.go +++ b/client/object_search.go @@ -12,6 +12,7 @@ import ( rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -175,6 +176,30 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) { } } +// Iterate iterates over the list of found object identifiers. +// f can return true to stop iteration earlier. +// +// Returns an error if object can't be read. +func (x *ObjectListReader) Iterate(f func(oid.ID) bool) error { + buf := make([]oid.ID, 1) + + for { + // Do not check first return value because `len(buf) == 1`, + // so false means nothing was read. + _, ok := x.Read(buf) + if !ok { + res, err := x.Close() + if err != nil { + return err + } + return apistatus.ErrFromStatus(res.Status()) + } + if f(buf[0]) { + return nil + } + } +} + // Close ends reading list of the matched objects and returns the result of the operation // along with the final results. Must be called after using the ObjectListReader. // diff --git a/client/object_search_test.go b/client/object_search_test.go new file mode 100644 index 0000000..3ce52c1 --- /dev/null +++ b/client/object_search_test.go @@ -0,0 +1,191 @@ +package client + +import ( + "errors" + "io" + "testing" + + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + signatureV2 "github.com/nspcc-dev/neofs-api-go/v2/signature" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestObjectSearch(t *testing.T) { + ids := make([]oid.ID, 20) + for i := range ids { + ids[i] = *oidtest.ID() + } + + resp, setID := testListReaderResponse(t) + + buf := make([]oid.ID, 2) + checkRead := func(t *testing.T, expected []oid.ID) { + n, ok := resp.Read(buf) + require.True(t, ok == (len(expected) == len(buf)), "expected no error") + require.Equal(t, len(expected), n, "expected %d items to be read", len(expected)) + require.Equal(t, expected, buf[:len(expected)]) + } + + // nil panic + require.Panics(t, func() { resp.Read(nil) }) + + // both ID fetched + setID(ids[:3]) + checkRead(t, ids[:2]) + + // one ID cached, second fetched + setID(ids[3:6]) + checkRead(t, ids[2:4]) + + // both ID cached + resp.ctxCall.resp = nil + checkRead(t, ids[4:6]) + + // both ID fetched in 2 requests, with empty one in the middle + var n int + resp.ctxCall.rResp = func() error { + switch n { + case 0: + setID(ids[6:7]) + case 1: + setID(nil) + case 2: + setID(ids[7:8]) + default: + t.FailNow() + } + n++ + return nil + } + checkRead(t, ids[6:8]) + + // read from tail multiple times + resp.ctxCall.rResp = nil + setID(ids[8:11]) + buf = buf[:1] + checkRead(t, ids[8:9]) + checkRead(t, ids[9:10]) + checkRead(t, ids[10:11]) + + // handle EOF + buf = buf[:2] + n = 0 + resp.ctxCall.rResp = func() error { + if n > 0 { + return io.EOF + } + n++ + setID(ids[11:12]) + return nil + } + checkRead(t, ids[11:12]) +} + +func TestObjectIterate(t *testing.T) { + ids := make([]oid.ID, 3) + for i := range ids { + ids[i] = *oidtest.ID() + } + + t.Run("iterate all sequence", func(t *testing.T) { + resp, setID := testListReaderResponse(t) + + // Iterate over all sequence + var n int + resp.ctxCall.rResp = func() error { + switch n { + case 0: + setID(ids[0:2]) + case 1: + setID(nil) + case 2: + setID(ids[2:3]) + default: + return io.EOF + } + n++ + return nil + } + + var actual []oid.ID + require.NoError(t, resp.Iterate(func(id oid.ID) bool { + actual = append(actual, id) + return false + })) + require.Equal(t, ids[:3], actual) + }) + t.Run("stop by return value", func(t *testing.T) { + resp, setID := testListReaderResponse(t) + + var actual []oid.ID + setID(ids) + require.NoError(t, resp.Iterate(func(id oid.ID) bool { + actual = append(actual, id) + return len(actual) == 2 + })) + require.Equal(t, ids[:2], actual) + }) + t.Run("stop after error", func(t *testing.T) { + resp, setID := testListReaderResponse(t) + expectedErr := errors.New("test error") + + var actual []oid.ID + var n int + resp.ctxCall.rResp = func() error { + switch n { + case 0: + setID(ids[:2]) + default: + return expectedErr + } + n++ + return nil + } + + err := resp.Iterate(func(id oid.ID) bool { + actual = append(actual, id) + return false + }) + require.True(t, errors.Is(err, expectedErr), "got: %v", err) + require.Equal(t, ids[:2], actual) + }) +} + +func testListReaderResponse(t *testing.T) (*ObjectListReader, func(id []oid.ID) *object.SearchResponse) { + p, err := keys.NewPrivateKey() + require.NoError(t, err) + + obj := &ObjectListReader{ + cancelCtxStream: func() {}, + ctxCall: contextCall{ + closer: func() error { return nil }, + result: func(v2 responseV2) {}, + statusRes: new(ResObjectSearch), + }, + reqWritten: true, + bodyResp: object.SearchResponseBody{}, + tail: nil, + } + + return obj, func(id []oid.ID) *object.SearchResponse { + resp := new(object.SearchResponse) + resp.SetBody(new(object.SearchResponseBody)) + + v2id := make([]*refs.ObjectID, len(id)) + for i := range id { + v2id[i] = id[i].ToV2() + } + resp.GetBody().SetIDList(v2id) + err := signatureV2.SignServiceMessage(&p.PrivateKey, resp) + if err != nil { + t.Fatalf("error: %v", err) + } + obj.ctxCall.resp = resp + obj.bodyResp = *resp.GetBody() + return resp + } +} diff --git a/pool/pool.go b/pool/pool.go index d249576..c94d5c5 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1015,6 +1015,10 @@ func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { return n, nil } +func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error { + return x.r.Iterate(f) +} + func (x *ResObjectSearch) Close() { _, _ = x.r.Close() }