From fb3501752d42c838a43de1a074a0423295b1fbc0 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 12 Feb 2022 23:30:16 +0300 Subject: [PATCH] [#131] client: Re-implement Object.Search method Signed-off-by: Leonard Lyubich --- client/object_search.go | 266 ++++++++++++++++++++++++++++++++++++++++ pool/pool.go | 64 ++++++++-- 2 files changed, 318 insertions(+), 12 deletions(-) create mode 100644 client/object_search.go diff --git a/client/object_search.go b/client/object_search.go new file mode 100644 index 0000000..9b93217 --- /dev/null +++ b/client/object_search.go @@ -0,0 +1,266 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/token" +) + +// PrmObjectSearch groups parameters of ObjectSearch operation. +type PrmObjectSearch struct { + local bool + + sessionSet bool + session session.Token + + bearerSet bool + bearer token.BearerToken + + cnrSet bool + cnr cid.ID + + filters object.SearchFilters +} + +// MarkLocal tells the server to execute the operation locally. +func (x *PrmObjectSearch) MarkLocal() { + x.local = true +} + +// WithinSession specifies session within which the search query must be executed. +// +// Creator of the session acquires the authorship of the request. +// This may affect the execution of an operation (e.g. access control). +// +// Must be signed. +func (x *PrmObjectSearch) WithinSession(t session.Token) { + x.session = t + x.sessionSet = true +} + +// WithBearerToken attaches bearer token to be used for the operation. +// +// If set, underlying eACL rules will be used in access control. +// +// Must be signed. +func (x *PrmObjectSearch) WithBearerToken(t token.BearerToken) { + x.bearer = t + x.bearerSet = true +} + +// InContainer specifies the container in which to look for objects. +// Required parameter. +func (x *PrmObjectSearch) InContainer(id cid.ID) { + x.cnr = id + x.cnrSet = true +} + +// SetFilters sets filters by which to select objects. All container objects +// match unset/empty filters. +func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) { + x.filters = filters +} + +// ResObjectSearch groups the final result values of ObjectSearch operation. +type ResObjectSearch struct { + statusRes +} + +// ObjectListReader is designed to read list of object identifiers from NeoFS system. +// +// Must be initialized using Client.ObjectSearch, any other usage is unsafe. +type ObjectListReader struct { + cancelCtxStream context.CancelFunc + + ctxCall contextCall + + reqWritten bool + + // initially bound to contextCall + bodyResp v2object.SearchResponseBody + + tail []*v2refs.ObjectID +} + +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *ObjectListReader) UseKey(key ecdsa.PrivateKey) { + x.ctxCall.key = key +} + +// Read reads another list of the object identifiers. Works similar to +// io.Reader.Read but copies oid.ID and returns success flag instead of error. +// +// Failure reason can be received via Close. +// +// Panics if buf has zero length. +func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) { + if len(buf) == 0 { + panic("empty buffer in ObjectListReader.ReadList") + } + + if !x.reqWritten { + if !x.ctxCall.writeRequest() { + return 0, false + } + + x.reqWritten = true + } + + // read remaining tail + read := len(x.tail) + if read > len(buf) { + read = len(buf) + } + + for i := 0; i < read; i++ { + buf[i] = *oid.NewIDFromV2(x.tail[i]) // need smth better + } + + x.tail = x.tail[read:] + + if len(buf) == read { + return read, true + } + + // receive next message + ok := x.ctxCall.readResponse() + if !ok { + return read, false + } + + // read new chunk of objects + ids := x.bodyResp.GetIDList() + ln := len(ids) + if ln == 0 { + x.ctxCall.err = io.EOF + return read, false + } + + buf = buf[read:] + if ln > len(buf) { + ln = len(buf) + } + + for i := 0; i < ln; i++ { + buf[i] = *oid.NewIDFromV2(ids[i]) // need smth better + } + + read += ln + + // save the tail + x.tail = append(x.tail, ids[ln:]...) + + return read, true +} + +// Close ends reading list of the matched objects and returns the result of the operation +// along with the final results. Must be called after using the ObjectListReader. +// +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as Go built-in error. +// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures +// codes are returned as error. +// +// Return statuses: +// global (see Client docs). +func (x *ObjectListReader) Close() (*ResObjectSearch, error) { + defer x.cancelCtxStream() + + if x.ctxCall.err != nil && !errors.Is(x.ctxCall.err, io.EOF) { + return nil, x.ctxCall.err + } + + return x.ctxCall.statusRes.(*ResObjectSearch), nil +} + +// ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol. +// +// The call only opens the transmission channel, explicit fetching of matched objects +// is done using the ObjectListReader. Exactly one return value is non-nil. +// Resulting reader must be finally closed. +// +// Immediately panics if parameters are set incorrectly (see PrmObjectSearch docs). +// Context is required and must not be nil. It is used for network communication. +func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*ObjectListReader, error) { + // check parameters + switch { + case ctx == nil: + panic(panicMsgMissingContext) + case !prm.cnrSet: + panic(panicMsgMissingContainer) + } + + // form request body + var body v2object.SearchRequestBody + + body.SetVersion(1) + body.SetContainerID(prm.cnr.ToV2()) + body.SetFilters(prm.filters.ToV2()) + + // form meta header + var meta v2session.RequestMetaHeader + + if prm.local { + meta.SetTTL(1) + } + + if prm.bearerSet { + meta.SetBearerToken(prm.bearer.ToV2()) + } + + if prm.sessionSet { + meta.SetSessionToken(prm.session.ToV2()) + } + + // form request + var req v2object.SearchRequest + + req.SetBody(&body) + req.SetMetaHeader(&meta) + + // init reader + var ( + r ObjectListReader + resp v2object.SearchResponse + stream *rpcapi.SearchResponseReader + ) + + ctx, r.cancelCtxStream = context.WithCancel(ctx) + + resp.SetBody(&r.bodyResp) + + // init call context + c.initCallContext(&r.ctxCall) + r.ctxCall.req = &req + r.ctxCall.statusRes = new(ResObjectSearch) + r.ctxCall.resp = &resp + r.ctxCall.wReq = func() error { + var err error + + stream, err = rpcapi.SearchObjects(c.Raw(), &req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("open stream: %w", err) + } + + return nil + } + r.ctxCall.rResp = func() error { + return stream.Read(&resp) + } + + return &r, nil +} diff --git a/pool/pool.go b/pool/pool.go index 157527e..41e2c28 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -49,7 +49,7 @@ type Client interface { ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) ObjectRangeInit(context.Context, client.PrmObjectRange) (*client.ObjectRangeReader, error) HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error) - SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error) + ObjectSearchInit(context.Context, client.PrmObjectSearch) (*client.ObjectListReader, error) AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm) (*client.AnnounceLocalTrustRes, error) AnnounceIntermediateTrust(context.Context, client.AnnounceIntermediateTrustPrm) (*client.AnnounceIntermediateTrustRes, error) CreateSession(context.Context, client.CreateSessionPrm) (*client.CreateSessionRes, error) @@ -163,12 +163,12 @@ type Pool interface { type Object interface { PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error - GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) + GetObject(context.Context, address.Address, ...CallOption) (*ResGetObject, error) HeadObject(context.Context, address.Address, ...CallOption) (*object.Object, error) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) - SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error) + SearchObjects(context.Context, cid.ID, object.SearchFilters, ...CallOption) (*ResObjectSearch, error) } type Container interface { @@ -1092,25 +1092,65 @@ func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChe return hs, nil } -func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error) { +type ResObjectSearch struct { + r *client.ObjectListReader +} + +func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { + n, ok := x.r.Read(buf) + if !ok { + _, err := x.r.Close() + if err == nil { + return n, io.EOF + } + + return n, err + } + + return n, nil +} + +func (x *ResObjectSearch) Close() { + _, _ = x.r.Close() +} + +func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.SearchFilters, opts ...CallOption) (*ResObjectSearch, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) - cp, options, err := p.conn(ctx, cfg) + + var prm client.PrmObjectSearch + + prm.InContainer(idCnr) + prm.SetFilters(filters) + + var cc callContextWithRetry + + cc.Context = ctx + cc.sessionTarget = prm.WithinSession + + err := p.initCallContextWithRetry(&cc, cfg) if err != nil { return nil, err } - res, err := cp.client.SearchObjects(ctx, params, options...) + var res ResObjectSearch - if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { - opts = append(opts, retry()) - return p.SearchObject(ctx, params, opts...) - } + err = p.callWithRetry(&cc, func() error { + var err error - if err != nil { // here err already carries both status and client errors + res.r, err = cc.client.ObjectSearchInit(ctx, prm) + if err != nil { + return fmt.Errorf("init object searching on client: %w", err) + } + + res.r.UseKey(*cc.key) + + return nil + }) + if err != nil { return nil, err } - return res.IDList(), nil + return &res, nil } func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {