From ff4febff6f882f4561aaea954bcf37da2ad5a8dd Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 12 Feb 2022 12:26:38 +0300 Subject: [PATCH] [#131] client: Re-implement Object.Range method Signed-off-by: Leonard Lyubich --- client/object_get.go | 246 +++++++++++++++++++++++++++++++++++++++++++ pool/mock_test.go | 18 ++-- pool/pool.go | 61 +++++++++-- 3 files changed, 303 insertions(+), 22 deletions(-) diff --git a/client/object_get.go b/client/object_get.go index 5949f784..5c8646ce 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -456,3 +456,249 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH return &res, nil } + +// PrmObjectRange groups parameters of ObjectRange operation. +type PrmObjectRange struct { + prmObjectRead + + off, ln uint64 +} + +// SetOffset sets offset of the payload range to be read. +// Zero by default. +func (x *PrmObjectRange) SetOffset(off uint64) { + x.off = off +} + +// SetLength sets length of the payload range to be read. +// Must be positive. +func (x *PrmObjectRange) SetLength(ln uint64) { + x.ln = ln +} + +// ResObjectRange groups the final result values of ObjectRange operation. +type ResObjectRange struct { + statusRes +} + +// ObjectRangeReader is designed to read payload range of one object +// from NeoFS system. +// +// Must be initialized using Client.ObjectRangeInit, any other +// usage is unsafe. +type ObjectRangeReader struct { + cancelCtxStream context.CancelFunc + + ctxCall contextCall + + reqWritten bool + + // initially bound to contextCall + bodyResp v2object.GetRangeResponseBody + + tailPayload []byte +} + +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *ObjectRangeReader) UseKey(key ecdsa.PrivateKey) { + x.ctxCall.key = key +} + +func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) { + if !x.reqWritten { + if !x.ctxCall.writeRequest() { + return 0, false + } + + x.reqWritten = true + } + + var read int + + // read remaining tail + read = copy(buf, x.tailPayload) + + x.tailPayload = x.tailPayload[read:] + + if len(buf) == read { + return read, true + } + + // receive next message + ok := x.ctxCall.readResponse() + if !ok { + return read, false + } + + // get chunk message + var partChunk *v2object.GetRangePartChunk + + switch v := x.bodyResp.GetRangePart().(type) { + default: + x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v) + return read, false + case *v2object.SplitInfo: + handleSplitInfo(&x.ctxCall, v) + return read, false + case *v2object.GetRangePartChunk: + partChunk = v + } + + // read new chunk + chunk := partChunk.GetChunk() + + tailOffset := copy(buf[read:], chunk) + + read += tailOffset + + // save the tail + x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...) + + return read, true +} + +// ReadChunk reads another chunk of the object payload range. +// Works similar to io.Reader.Read but returns success flag instead of error. +// +// Failure reason can be received via Close. +func (x *ObjectRangeReader) ReadChunk(buf []byte) (int, bool) { + return x.readChunk(buf) +} + +func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) { + defer x.cancelCtxStream() + + if x.ctxCall.err != nil { + if !errors.Is(x.ctxCall.err, io.EOF) { + return nil, x.ctxCall.err + } else if !ignoreEOF { + return nil, io.EOF + } + } + + return x.ctxCall.statusRes.(*ResObjectRange), nil +} + +// Close ends reading the payload range and returns the result of the operation +// along with the final results. Must be called after using the ObjectRangeReader. +// +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as Go built-in error. +// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures +// codes are returned as error. +// +// Return errors: +// *object.SplitInfoError (returned on virtual objects with PrmObjectRange.MakeRaw). +// +// Return statuses: +// global (see Client docs). +func (x *ObjectRangeReader) Close() (*ResObjectRange, error) { + return x.close(true) +} + +func (x *ObjectRangeReader) Read(p []byte) (int, error) { + n, ok := x.readChunk(p) + if !ok { + res, err := x.close(false) + if err != nil { + return n, err + } else if !x.ctxCall.resolveAPIFailures { + return n, apistatus.ErrFromStatus(res.Status()) + } + } + + return n, nil +} + +// ObjectRangeInit initiates reading an object's payload range through a remote +// server using NeoFS API protocol. +// +// The call only opens the transmission channel, explicit fetching is done using the ObjectRangeReader. +// Exactly one return value is non-nil. Resulting reader must be finally closed. +// +// Immediately panics if parameters are set incorrectly (see PrmObjectRange docs). +// Context is required and must not be nil. It is used for network communication. +func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) { + // check parameters + switch { + case ctx == nil: + panic(panicMsgMissingContext) + case !prm.cnrSet: + panic(panicMsgMissingContainer) + case !prm.objSet: + panic("missing object") + case prm.ln == 0: + panic("zero range length") + } + + var addr v2refs.Address + + addr.SetContainerID(prm.cnr.ToV2()) + addr.SetObjectID(prm.obj.ToV2()) + + var rng v2object.Range + + rng.SetOffset(prm.off) + rng.SetLength(prm.ln) + + // form request body + var body v2object.GetRangeRequestBody + + body.SetRaw(prm.raw) + body.SetAddress(&addr) + body.SetRange(&rng) + + // form meta header + var meta v2session.RequestMetaHeader + + if prm.local { + meta.SetTTL(1) + } + + if prm.bearerSet { + meta.SetBearerToken(prm.bearer.ToV2()) + } + + if prm.sessionSet { + meta.SetSessionToken(prm.session.ToV2()) + } + + // form request + var req v2object.GetRangeRequest + + req.SetBody(&body) + req.SetMetaHeader(&meta) + + // init reader + var ( + r ObjectRangeReader + resp v2object.GetRangeResponse + stream *rpcapi.ObjectRangeResponseReader + ) + + ctx, r.cancelCtxStream = context.WithCancel(ctx) + + resp.SetBody(&r.bodyResp) + + // init call context + c.initCallContext(&r.ctxCall) + r.ctxCall.req = &req + r.ctxCall.statusRes = new(ResObjectRange) + r.ctxCall.resp = &resp + r.ctxCall.wReq = func() error { + var err error + + stream, err = rpcapi.GetObjectRange(c.Raw(), &req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("open stream: %w", err) + } + + return nil + } + r.ctxCall.rResp = func() error { + return stream.Read(&resp) + } + + return &r, nil +} diff --git a/pool/mock_test.go b/pool/mock_test.go index 484c1a9b..c91c6235 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -310,24 +310,20 @@ func (mr *MockClientMockRecorder) NetworkInfo(arg0, arg1 interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetworkInfo", reflect.TypeOf((*MockClient)(nil).NetworkInfo), varargs...) } -// ObjectPayloadRangeData mocks base method. -func (m *MockClient) ObjectPayloadRangeData(arg0 context.Context, arg1 *client0.RangeDataParams, arg2 ...client0.CallOption) (*client0.ObjectRangeRes, error) { +// ObjectRangeInit mocks base method. +func (m *MockClient) ObjectRangeInit(arg0 context.Context, arg1 client0.PrmObjectRange) (*client0.ObjectRangeReader, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "ObjectPayloadRangeData", varargs...) - ret0, _ := ret[0].(*client0.ObjectRangeRes) + ret := m.ctrl.Call(m, "ObjectRangeInit", arg0, arg1) + ret0, _ := ret[0].(*client0.ObjectRangeReader) ret1, _ := ret[1].(error) return ret0, ret1 } -// ObjectPayloadRangeData indicates an expected call of ObjectPayloadRangeData. -func (mr *MockClientMockRecorder) ObjectPayloadRangeData(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +// ObjectRange indicates an expected call of ObjectRangeInit. +func (mr *MockClientMockRecorder) ObjectRange(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectPayloadRangeData", reflect.TypeOf((*MockClient)(nil).ObjectPayloadRangeData), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectRangeInit", reflect.TypeOf((*MockClient)(nil).ObjectRangeInit), varargs...) } // PutContainer mocks base method. diff --git a/pool/pool.go b/pool/pool.go index 79ad6584..157527ed 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -47,7 +47,7 @@ type Client interface { DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) - ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error) + ObjectRangeInit(context.Context, client.PrmObjectRange) (*client.ObjectRangeReader, error) HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error) SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error) AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm) (*client.AnnounceLocalTrustRes, error) @@ -165,7 +165,7 @@ type Object interface { DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) HeadObject(context.Context, address.Address, ...CallOption) (*object.Object, error) - ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) + ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error) @@ -946,25 +946,64 @@ func (p *pool) HeadObject(ctx context.Context, addr address.Address, opts ...Cal return &obj, nil } -func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) { +type ResObjectRange struct { + payload *client.ObjectRangeReader +} + +func (x *ResObjectRange) Read(p []byte) (int, error) { + return x.payload.Read(p) +} + +func (x *ResObjectRange) Close() error { + _, err := x.payload.Close() + return err +} + +func (p *pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) - cp, options, err := p.conn(ctx, cfg) + + var prm client.PrmObjectRange + + prm.SetOffset(off) + prm.SetLength(ln) + + var cc callContextWithRetry + + cc.Context = ctx + cc.sessionTarget = prm.WithinSession + + err := p.initCallContextWithRetry(&cc, cfg) if err != nil { return nil, err } - res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...) - - if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { - opts = append(opts, retry()) - return p.ObjectPayloadRangeData(ctx, params, opts...) + if cnr := addr.ContainerID(); cnr != nil { + prm.FromContainer(*cnr) } - if err != nil { // here err already carries both status and client errors + if obj := addr.ObjectID(); obj != nil { + prm.ByID(*obj) + } + + var res ResObjectRange + + err = p.callWithRetry(&cc, func() error { + var err error + + res.payload, err = cc.client.ObjectRangeInit(ctx, prm) + if err != nil { + return fmt.Errorf("init payload range reading on client: %w", err) + } + + res.payload.UseKey(*cc.key) + + return nil + }) + if err != nil { return nil, err } - return res.Data(), nil + return &res, nil } func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams {