From caf31a928cefa433c202a873e15f113c61d4a312 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 11 Feb 2022 19:13:53 +0300 Subject: [PATCH] [#131] client: Re-implement Object.Head method Signed-off-by: Leonard Lyubich --- client/common.go | 2 +- client/object.go | 192 ------------------------------------------- client/object_get.go | 165 ++++++++++++++++++++++++++++++++++--- pool/mock_test.go | 14 ++-- pool/pool.go | 94 ++++++++++++--------- 5 files changed, 216 insertions(+), 251 deletions(-) diff --git a/client/common.go b/client/common.go index a113e4a8..a143c3cf 100644 --- a/client/common.go +++ b/client/common.go @@ -271,7 +271,7 @@ func (x *contextCall) readResponse() bool { return x.processResponse() } -// closes the message stream (if closer is set) and writes the results (if resuls is set). +// closes the message stream (if closer is set) and writes the results (if result is set). // Return means success. If failed, contextCall.err contains the reason. func (x *contextCall) close() bool { if x.closer != nil { diff --git a/client/object.go b/client/object.go index 70062d69..4027c587 100644 --- a/client/object.go +++ b/client/object.go @@ -32,14 +32,6 @@ type DeleteObjectParams struct { tombTgt ObjectAddressWriter } -type ObjectHeaderParams struct { - addr *address.Address - - raw bool - - short bool -} - type RangeDataParams struct { addr *address.Address @@ -251,190 +243,6 @@ func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) { res.setStatus(st) } -func (p *ObjectHeaderParams) WithAddress(v *address.Address) *ObjectHeaderParams { - if p != nil { - p.addr = v - } - - return p -} - -func (p *ObjectHeaderParams) Address() *address.Address { - if p != nil { - return p.addr - } - - return nil -} - -func (p *ObjectHeaderParams) WithAllFields() *ObjectHeaderParams { - if p != nil { - p.short = false - } - - return p -} - -// AllFields return true if parameter set to return all header fields, returns -// false if parameter set to return only main fields of header. -func (p *ObjectHeaderParams) AllFields() bool { - if p != nil { - return !p.short - } - - return false -} - -func (p *ObjectHeaderParams) WithMainFields() *ObjectHeaderParams { - if p != nil { - p.short = true - } - - return p -} - -func (p *ObjectHeaderParams) WithRawFlag(v bool) *ObjectHeaderParams { - if p != nil { - p.raw = v - } - - return p -} - -func (p *ObjectHeaderParams) RawFlag() bool { - if p != nil { - return p.raw - } - - return false -} - -type ObjectHeadRes struct { - statusRes - objectRes -} - -// HeadObject receives object's header through NeoFS API call. -// -// Any client's internal or transport errors are returned as `error`. -// If WithNeoFSErrorParsing option has been provided, unsuccessful -// NeoFS status codes are returned as `error`, otherwise, are included -// in the returned result structure. -func (c *Client) HeadObject(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*ObjectHeadRes, error) { - callOpts := c.defaultCallOptions() - - for i := range opts { - if opts[i] != nil { - opts[i](callOpts) - } - } - - // create request - req := new(v2object.HeadRequest) - - // initialize request body - body := new(v2object.HeadRequestBody) - req.SetBody(body) - - // set meta header - meta := v2MetaHeaderFromOpts(callOpts) - - if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ - addr: p.addr.ToV2(), - verb: v2session.ObjectVerbHead, - }); err != nil { - return nil, fmt.Errorf("could not attach session token: %w", err) - } - - req.SetMetaHeader(meta) - - // fill body fields - body.SetAddress(p.addr.ToV2()) - body.SetMainOnly(p.short) - body.SetRaw(p.raw) - - // sign the request - if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, fmt.Errorf("signing the request failed: %w", err) - } - - // send Head request - resp, err := rpcapi.HeadObject(c.Raw(), req, client.WithContext(ctx)) - if err != nil { - return nil, fmt.Errorf("sending the request failed: %w", err) - } - - var ( - res = new(ObjectHeadRes) - procPrm processResponseV2Prm - procRes processResponseV2Res - ) - - procPrm.callOpts = callOpts - procPrm.resp = resp - - procRes.statusRes = res - - // process response in general - if c.processResponseV2(&procRes, procPrm) { - if procRes.cliErr != nil { - return nil, procRes.cliErr - } - - return res, nil - } - - var ( - hdr *v2object.Header - idSig *v2refs.Signature - ) - - switch v := resp.GetBody().GetHeaderPart().(type) { - case nil: - writeUnexpectedMessageTypeErr(res, v) - return res, nil - case *v2object.ShortHeader: - if !p.short { - writeUnexpectedMessageTypeErr(res, v) - return res, nil - } - - h := v - - hdr = new(v2object.Header) - hdr.SetPayloadLength(h.GetPayloadLength()) - hdr.SetVersion(h.GetVersion()) - hdr.SetOwnerID(h.GetOwnerID()) - hdr.SetObjectType(h.GetObjectType()) - hdr.SetCreationEpoch(h.GetCreationEpoch()) - hdr.SetPayloadHash(h.GetPayloadHash()) - hdr.SetHomomorphicHash(h.GetHomomorphicHash()) - case *v2object.HeaderWithSignature: - if p.short { - writeUnexpectedMessageTypeErr(res, v) - return res, nil - } - - hdr = v.GetHeader() - idSig = v.GetSignature() - case *v2object.SplitInfo: - si := object.NewSplitInfoFromV2(v) - - return nil, object.NewSplitInfoError(si) - } - - obj := new(v2object.Object) - obj.SetHeader(hdr) - obj.SetSignature(idSig) - - raw := object.NewRawFromV2(obj) - raw.SetID(p.addr.ObjectID()) - - res.setObject(raw.Object()) - - return res, nil -} - func (p *RangeDataParams) WithAddress(v *address.Address) *RangeDataParams { if p != nil { p.addr = v diff --git a/client/object_get.go b/client/object_get.go index 504596b5..5949f784 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -20,8 +20,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/token" ) -// PrmObjectGet groups parameters of ObjectGetInit operation. -type PrmObjectGet struct { +// shared parameters of GET/HEAD/RANGE. +type prmObjectRead struct { raw bool local bool @@ -40,41 +40,55 @@ type PrmObjectGet struct { } // MarkRaw marks an intent to read physically stored object. -func (x *PrmObjectGet) MarkRaw() { +func (x *prmObjectRead) MarkRaw() { x.raw = true } // MarkLocal tells the server to execute the operation locally. -func (x *PrmObjectGet) MarkLocal() { +func (x *prmObjectRead) MarkLocal() { x.local = true } // WithinSession specifies session within which object should be read. -func (x *PrmObjectGet) WithinSession(t session.Token) { +// +// Creator of the session acquires the authorship of the request. +// This may affect the execution of an operation (e.g. access control). +// +// Must be signed. +func (x *prmObjectRead) WithinSession(t session.Token) { x.session = t x.sessionSet = true } // WithBearerToken attaches bearer token to be used for the operation. -func (x *PrmObjectGet) WithBearerToken(t token.BearerToken) { +// +// If set, underlying eACL rules will be used in access control. +// +// Must be signed. +func (x *prmObjectRead) WithBearerToken(t token.BearerToken) { x.bearer = t x.bearerSet = true } // FromContainer specifies NeoFS container of the object. // Required parameter. -func (x *PrmObjectGet) FromContainer(id cid.ID) { +func (x *prmObjectRead) FromContainer(id cid.ID) { x.cnr = id x.cnrSet = true } // ByID specifies identifier of the requested object. // Required parameter. -func (x *PrmObjectGet) ByID(id oid.ID) { +func (x *prmObjectRead) ByID(id oid.ID) { x.obj = id x.objSet = true } +// PrmObjectGet groups parameters of ObjectGetInit operation. +type PrmObjectGet struct { + prmObjectRead +} + // ResObjectGet groups the final result values of ObjectGetInit operation. type ResObjectGet struct { statusRes @@ -101,6 +115,10 @@ func (x *ObjectReader) UseKey(key ecdsa.PrivateKey) { x.ctxCall.key = key } +func handleSplitInfo(ctx *contextCall, i *v2object.SplitInfo) { + ctx.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(i)) +} + // ReadHeader reads header of the object. Result means success. // Failure reason can be received via Close. func (x *ObjectReader) ReadHeader(dst *object.Object) bool { @@ -117,7 +135,7 @@ func (x *ObjectReader) ReadHeader(dst *object.Object) bool { x.ctxCall.err = fmt.Errorf("unexpected message instead of heading part: %T", v) return false case *v2object.SplitInfo: - x.ctxCall.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) + handleSplitInfo(&x.ctxCall, v) return false case *v2object.GetObjectPartInit: partInit = v @@ -231,7 +249,7 @@ func (x *ObjectReader) Read(p []byte) (int, error) { // ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol. // -// The call only opens the transmission channel, explicit fetching is done using the ObjectWriter. +// The call only opens the transmission channel, explicit fetching is done using the ObjectReader. // Exactly one return value is non-nil. Resulting reader must be finally closed. // // Immediately panics if parameters are set incorrectly (see PrmObjectGet docs). @@ -311,3 +329,130 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe return &r, nil } + +// PrmObjectHead groups parameters of ObjectHead operation. +type PrmObjectHead struct { + prmObjectRead +} + +// ResObjectHead groups resulting values of ObjectHead operation. +type ResObjectHead struct { + statusRes + + // requested object (response doesn't carry the ID) + idObj oid.ID + + hdr *v2object.HeaderWithSignature +} + +// ReadHeader reads header of the requested object. +// Returns false if header is missing in the response (not read). +func (x *ResObjectHead) ReadHeader(dst *object.Object) bool { + if x.hdr == nil { + return false + } + + var objv2 v2object.Object + + objv2.SetHeader(x.hdr.GetHeader()) + objv2.SetSignature(x.hdr.GetSignature()) + + raw := object.NewRawFromV2(&objv2) + raw.SetID(&x.idObj) + + *dst = *raw.Object() + + return true +} + +// ObjectHead reads object header through a remote server using NeoFS API protocol. +// +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as `error`, +// If WithNeoFSErrorParsing option has been provided, unsuccessful +// NeoFS status codes are returned as `error`, otherwise, are included +// in the returned result structure. +// +// Immediately panics if parameters are set incorrectly (see PrmObjectHead docs). +// Context is required and must not be nil. It is used for network communication. +// +// Return errors: +// *object.SplitInfoError (returned on virtual objects with PrmObjectHead.MakeRaw). +// +// Return statuses: +// - global (see Client docs). +func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) { + switch { + case ctx == nil: + panic(panicMsgMissingContext) + case !prm.cnrSet: + panic(panicMsgMissingContainer) + case !prm.objSet: + panic("missing object") + } + + var addr v2refs.Address + + addr.SetContainerID(prm.cnr.ToV2()) + addr.SetObjectID(prm.obj.ToV2()) + + // form request body + var body v2object.HeadRequestBody + + body.SetRaw(prm.raw) + body.SetAddress(&addr) + + // form meta header + var meta v2session.RequestMetaHeader + + if prm.local { + meta.SetTTL(1) + } + + if prm.bearerSet { + meta.SetBearerToken(prm.bearer.ToV2()) + } + + if prm.sessionSet { + meta.SetSessionToken(prm.session.ToV2()) + } + + // form request + var req v2object.HeadRequest + + req.SetBody(&body) + req.SetMetaHeader(&meta) + + // init call context + + var ( + cc contextCall + res ResObjectHead + ) + + res.idObj = prm.obj + + c.initCallContext(&cc) + cc.req = &req + cc.statusRes = &res + cc.call = func() (responseV2, error) { + return rpcapi.HeadObject(c.Raw(), &req, client.WithContext(ctx)) + } + cc.result = func(r responseV2) { + switch v := r.(*v2object.HeadResponse).GetBody().GetHeaderPart().(type) { + default: + cc.err = fmt.Errorf("unexpected header type %T", v) + case *v2object.SplitInfo: + handleSplitInfo(&cc, v) + case *v2object.HeaderWithSignature: + res.hdr = v + } + } + + // process call + if !cc.processCall() { + return nil, cc.err + } + + return &res, nil +} diff --git a/pool/mock_test.go b/pool/mock_test.go index af0b461d..484c1a9b 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -260,15 +260,11 @@ func (mr *MockClientMockRecorder) HashObjectPayloadRanges(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HashObjectPayloadRanges", reflect.TypeOf((*MockClient)(nil).HashObjectPayloadRanges), varargs...) } -// HeadObject mocks base method. -func (m *MockClient) HeadObject(arg0 context.Context, arg1 *client0.ObjectHeaderParams, arg2 ...client0.CallOption) (*client0.ObjectHeadRes, error) { +// ObjectHead mocks base method. +func (m *MockClient) ObjectHead(arg0 context.Context, arg1 client0.PrmObjectHead) (*client0.ResObjectHead, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "HeadObject", varargs...) - ret0, _ := ret[0].(*client0.ObjectHeadRes) + ret := m.ctrl.Call(m, "HeadObject", arg0, arg1) + ret0, _ := ret[0].(*client0.ResObjectHead) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -277,7 +273,7 @@ func (m *MockClient) HeadObject(arg0 context.Context, arg1 *client0.ObjectHeader func (mr *MockClientMockRecorder) HeadObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObject", reflect.TypeOf((*MockClient)(nil).HeadObject), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObject", reflect.TypeOf((*MockClient)(nil).ObjectHead), varargs...) } // ListContainers mocks base method. diff --git a/pool/pool.go b/pool/pool.go index 3e53fb93..45afc1a5 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -46,7 +46,7 @@ type Client interface { ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error) DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) - HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error) + ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error) HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error) SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error) @@ -164,7 +164,7 @@ type Object interface { PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) - GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) + HeadObject(context.Context, address.Address, ...CallOption) (*object.Object, error) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) @@ -648,6 +648,8 @@ func (p *pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConf return nil } +// opens new session or uses cached one. +// Must be called only on initialized callContext with set sessionTarget. func (p *pool) openDefaultSession(ctx *callContext) error { cacheKey := formCacheKey(ctx.endpoint, ctx.key) @@ -684,14 +686,29 @@ func (p *pool) openDefaultSession(ctx *callContext) error { return nil } -func (p *pool) handleAttemptError(ctx *callContextWithRetry, err error) bool { - isTokenErr := p.checkSessionTokenErr(err, ctx.endpoint) - // note that checkSessionTokenErr must be called - res := isTokenErr && !ctx.noRetry +// opens default session (if sessionDefault is set), and calls f. If f returns +// session-related error (*), and retrying is enabled, then f is called once more. +// +// (*) in this case cached token is removed. +func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { + var err error - ctx.noRetry = true + if ctx.sessionDefault { + err = p.openDefaultSession(&ctx.callContext) + if err != nil { + return fmt.Errorf("open default session: %w", err) + } + } - return res + err = f() + + if p.checkSessionTokenErr(err, ctx.endpoint) && !ctx.noRetry { + // don't retry anymore + ctx.noRetry = true + return p.callWithRetry(ctx, f) + } + + return err } func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) { @@ -836,27 +853,6 @@ type ResGetObject struct { Payload io.ReadCloser } -func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { - var err error - - if ctx.sessionDefault { - err = p.openDefaultSession(&ctx.callContext) - if err != nil { - return fmt.Errorf("open default session: %w", err) - } - } - - err = f() - - if p.checkSessionTokenErr(err, ctx.endpoint) && !ctx.noRetry { - // don't retry anymore - ctx.noRetry = true - return p.callWithRetry(ctx, f) - } - - return err -} - func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) @@ -903,25 +899,45 @@ func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...Call return &res, nil } -func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { +func (p *pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) - cp, options, err := p.conn(ctx, cfg) + + var prm client.PrmObjectHead + + var cc callContextWithRetry + + cc.Context = ctx + cc.sessionTarget = prm.WithinSession + + err := p.initCallContextWithRetry(&cc, cfg) if err != nil { return nil, err } - res, err := cp.client.HeadObject(ctx, params, options...) - - if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { - opts = append(opts, retry()) - return p.GetObjectHeader(ctx, params, opts...) + if cnr := addr.ContainerID(); cnr != nil { + prm.FromContainer(*cnr) } - if err != nil { // here err already carries both status and client errors - return nil, err + if obj := addr.ObjectID(); obj != nil { + prm.ByID(*obj) } - return res.Object(), nil + var obj object.Object + + err = p.callWithRetry(&cc, func() error { + res, err := cc.client.ObjectHead(ctx, prm) + if err != nil { + return fmt.Errorf("read object header via client: %w", err) + } + + if !res.ReadHeader(&obj) { + return errors.New("missing object header in response") + } + + return nil + }) + + return &obj, nil } func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) {