diff --git a/client/common.go b/client/common.go index 7afab98..a113e4a 100644 --- a/client/common.go +++ b/client/common.go @@ -151,15 +151,26 @@ type contextCall struct { req interface { GetMetaHeader() *v2session.RequestMetaHeader SetMetaHeader(*v2session.RequestMetaHeader) + SetVerificationHeader(*v2session.RequestVerificationHeader) } // function to send a request (unary) and receive a response call func() (responseV2, error) + // function to send the request (req field) + wReq func() error + + // function to recv the response (resp field) + rResp func() error + + // function to close the message stream + closer func() error + // function of writing response fields to the resulting structure (optional) result func(v2 responseV2) } +// sets needed fields of the request meta header. func (x contextCall) prepareRequest() { meta := x.req.GetMetaHeader() if meta == nil { @@ -178,6 +189,29 @@ func (x contextCall) prepareRequest() { meta.SetNetworkMagic(x.netMagic) } +// prepares, signs and writes the request. Result means success. +// If failed, contextCall.err contains the reason. +func (x *contextCall) writeRequest() bool { + x.prepareRequest() + + x.req.SetVerificationHeader(nil) + + // sign the request + x.err = signature.SignServiceMessage(&x.key, x.req) + if x.err != nil { + x.err = fmt.Errorf("sign request: %w", x.err) + return false + } + + x.err = x.wReq() + if x.err != nil { + x.err = fmt.Errorf("write request: %w", x.err) + return false + } + + return true +} + // performs common actions of response processing and writes any problem as a result status or client error // (in both cases returns false). // @@ -220,32 +254,32 @@ func (x *contextCall) processResponse() bool { x.statusRes.setStatus(st) - return successfulStatus + return successfulStatus || !x.resolveAPIFailures } -// goes through all stages of sending a request and processing a response. Returns true if successful. -func (x *contextCall) processCall() bool { - // prepare the request - x.prepareRequest() - - // sign the request - x.err = signature.SignServiceMessage(&x.key, x.req) - if x.err != nil { - x.err = fmt.Errorf("sign request: %w", x.err) - return false +// reads response (if rResp is set) and processes it. Result means success. +// If failed, contextCall.err contains the reason. +func (x *contextCall) readResponse() bool { + if x.rResp != nil { + x.err = x.rResp() + if x.err != nil { + x.err = fmt.Errorf("read response: %w", x.err) + return false + } } - // perform RPC - x.resp, x.err = x.call() - if x.err != nil { - x.err = fmt.Errorf("transport error: %w", x.err) - return false - } + return x.processResponse() +} - // process the response - ok := x.processResponse() - if !ok { - return false +// closes the message stream (if closer is set) and writes the results (if resuls is set). +// Return means success. If failed, contextCall.err contains the reason. +func (x *contextCall) close() bool { + if x.closer != nil { + x.err = x.closer() + if x.err != nil { + x.err = fmt.Errorf("close RPC: %w", x.err) + return false + } } // write response to resulting structure @@ -256,6 +290,34 @@ func (x *contextCall) processCall() bool { return true } +// goes through all stages of sending a request and processing a response. Returns true if successful. +// If failed, contextCall.err contains the reason. +func (x *contextCall) processCall() bool { + // set request writer + x.wReq = func() error { + var err error + x.resp, err = x.call() + return err + } + + // write request + ok := x.writeRequest() + if !ok { + return false + } + + // read response + ok = x.readResponse() + if !ok { + return false + } + + // close and write response to resulting structure + ok = x.close() + + return ok +} + // initializes static cross-call parameters inherited from client. func (c *Client) initCallContext(ctx *contextCall) { ctx.key = *c.opts.key diff --git a/client/object.go b/client/object.go index ac50096..70062d6 100644 --- a/client/object.go +++ b/client/object.go @@ -1,9 +1,7 @@ package client import ( - "bytes" "context" - "crypto/ecdsa" "errors" "fmt" "io" @@ -22,12 +20,6 @@ import ( signer "github.com/nspcc-dev/neofs-sdk-go/util/signature" ) -type PutObjectParams struct { - obj *object.Object - - r io.Reader -} - // ObjectAddressWriter is an interface of the // component that writes the object address. type ObjectAddressWriter interface { @@ -40,16 +32,6 @@ type DeleteObjectParams struct { tombTgt ObjectAddressWriter } -type GetObjectParams struct { - addr *address.Address - - raw bool - - w io.Writer - - readerHandler ReaderHandler -} - type ObjectHeaderParams struct { addr *address.Address @@ -84,20 +66,6 @@ type SearchObjectParams struct { filters object.SearchFilters } -type putObjectV2Reader struct { - r io.Reader -} - -type putObjectV2Writer struct { - key *ecdsa.PrivateKey - - chunkPart *v2object.PutObjectPartChunk - - req *v2object.PutRequest - - stream *rpcapi.PutRequestWriter -} - type checksumType int const ( @@ -106,8 +74,6 @@ const ( checksumTZ ) -const chunkSize = 3 * (1 << 20) - const TZSize = 64 const searchQueryVersion uint32 = 1 @@ -133,199 +99,6 @@ func (t checksumType) toV2() v2refs.ChecksumType { } } -func (w *putObjectV2Reader) Read(p []byte) (int, error) { - return w.r.Read(p) -} - -func (w *putObjectV2Writer) Write(p []byte) (int, error) { - w.chunkPart.SetChunk(p) - - w.req.SetVerificationHeader(nil) - - if err := signature.SignServiceMessage(w.key, w.req); err != nil { - return 0, fmt.Errorf("could not sign chunk request message: %w", err) - } - - if err := w.stream.Write(w.req); err != nil { - return 0, fmt.Errorf("could not send chunk request message: %w", err) - } - - return len(p), nil -} - -func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams { - if p != nil { - p.obj = v - } - - return p -} - -func (p *PutObjectParams) Object() *object.Object { - if p != nil { - return p.obj - } - - return nil -} - -func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams { - if p != nil { - p.r = v - } - - return p -} - -func (p *PutObjectParams) PayloadReader() io.Reader { - if p != nil { - return p.r - } - - return nil -} - -type ObjectPutRes struct { - statusRes - - id *oid.ID -} - -func (x *ObjectPutRes) setID(id *oid.ID) { - x.id = id -} - -func (x ObjectPutRes) ID() *oid.ID { - return x.id -} - -// PutObject puts object through NeoFS API call. -// -// Any client's internal or transport errors are returned as `error`. -// If WithNeoFSErrorParsing option has been provided, unsuccessful -// NeoFS status codes are returned as `error`, otherwise, are included -// in the returned result structure. -func (c *Client) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*ObjectPutRes, error) { - callOpts := c.defaultCallOptions() - - for i := range opts { - if opts[i] != nil { - opts[i](callOpts) - } - } - - // create request - req := new(v2object.PutRequest) - - // initialize request body - body := new(v2object.PutRequestBody) - req.SetBody(body) - - v2Addr := new(v2refs.Address) - v2Addr.SetObjectID(p.obj.ID().ToV2()) - v2Addr.SetContainerID(p.obj.ContainerID().ToV2()) - - // set meta header - meta := v2MetaHeaderFromOpts(callOpts) - - if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ - addr: v2Addr, - verb: v2session.ObjectVerbPut, - }); err != nil { - return nil, fmt.Errorf("could not attach session token: %w", err) - } - - req.SetMetaHeader(meta) - - // initialize init part - initPart := new(v2object.PutObjectPartInit) - body.SetObjectPart(initPart) - - obj := p.obj.ToV2() - - // set init part fields - initPart.SetObjectID(obj.GetObjectID()) - initPart.SetSignature(obj.GetSignature()) - initPart.SetHeader(obj.GetHeader()) - - // sign the request - if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, fmt.Errorf("signing the request failed: %w", err) - } - - // open stream - resp := new(v2object.PutResponse) - - stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx)) - if err != nil { - return nil, fmt.Errorf("stream opening failed: %w", err) - } - - // send init part - err = stream.Write(req) - if err != nil { - return nil, fmt.Errorf("sending the initial message to stream failed: %w", err) - } - - // create payload bytes reader - var rPayload io.Reader = bytes.NewReader(obj.GetPayload()) - if p.r != nil { - rPayload = io.MultiReader(rPayload, p.r) - } - - // create v2 payload stream writer - chunkPart := new(v2object.PutObjectPartChunk) - body.SetObjectPart(chunkPart) - - w := &putObjectV2Writer{ - key: callOpts.key, - chunkPart: chunkPart, - req: req, - stream: stream, - } - - r := &putObjectV2Reader{r: rPayload} - - // copy payload from reader to stream writer - _, err = io.CopyBuffer(w, r, make([]byte, chunkSize)) - if err != nil && !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("payload streaming failed: %w", err) - } - - // close object stream and receive response from remote node - err = stream.Close() - if err != nil { - return nil, fmt.Errorf("closing the stream failed: %w", err) - } - - var ( - res = new(ObjectPutRes) - procPrm processResponseV2Prm - procRes processResponseV2Res - ) - - procPrm.callOpts = callOpts - procPrm.resp = resp - - procRes.statusRes = res - - // process response in general - if c.processResponseV2(&procRes, procPrm) { - if procRes.cliErr != nil { - return nil, procRes.cliErr - } - - return res, nil - } - - // convert object identifier - id := oid.NewIDFromV2(resp.GetBody().GetObjectID()) - - res.setID(id) - - return res, nil -} - func (p *DeleteObjectParams) WithAddress(v *address.Address) *DeleteObjectParams { if p != nil { p.addr = v @@ -451,132 +224,6 @@ func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts . return res, nil } -func (p *GetObjectParams) WithAddress(v *address.Address) *GetObjectParams { - if p != nil { - p.addr = v - } - - return p -} - -func (p *GetObjectParams) Address() *address.Address { - if p != nil { - return p.addr - } - - return nil -} - -func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams { - if p != nil { - p.w = w - } - - return p -} - -func (p *GetObjectParams) PayloadWriter() io.Writer { - if p != nil { - return p.w - } - - return nil -} - -func (p *GetObjectParams) WithRawFlag(v bool) *GetObjectParams { - if p != nil { - p.raw = v - } - - return p -} - -func (p *GetObjectParams) RawFlag() bool { - if p != nil { - return p.raw - } - - return false -} - -// ReaderHandler is a function over io.Reader. -type ReaderHandler func(io.Reader) - -// WithPayloadReaderHandler sets handler of the payload reader. -// -// If provided, payload reader is composed after receiving the header. -// In this case payload writer set via WithPayloadWriter is ignored. -// -// Handler should not be nil. -func (p *GetObjectParams) WithPayloadReaderHandler(f ReaderHandler) *GetObjectParams { - if p != nil { - p.readerHandler = f - } - - return p -} - -// wrapper over the Object Get stream that provides io.Reader. -type objectPayloadReader struct { - stream interface { - Read(*v2object.GetResponse) error - } - - resp v2object.GetResponse - - tail []byte -} - -func (x *objectPayloadReader) Read(p []byte) (read int, err error) { - // read remaining tail - read = copy(p, x.tail) - - x.tail = x.tail[read:] - - if len(p)-read == 0 { - return - } - - // receive message from server stream - err = x.stream.Read(&x.resp) - if err != nil { - if errors.Is(err, io.EOF) { - err = io.EOF - return - } - - err = fmt.Errorf("reading the response failed: %w", err) - return - } - - // get chunk part message - part := x.resp.GetBody().GetObjectPart() - - chunkPart, ok := part.(*v2object.GetObjectPartChunk) - if !ok { - err = errWrongMessageSeq - return - } - - // verify response structure - if err = signature.VerifyServiceMessage(&x.resp); err != nil { - err = fmt.Errorf("response verification failed: %w", err) - return - } - - // read new chunk - chunk := chunkPart.GetChunk() - - tailOffset := copy(p[read:], chunk) - - read += tailOffset - - // save the tail - x.tail = append(x.tail, chunk[tailOffset:]...) - - return -} - var errWrongMessageSeq = errors.New("incorrect message sequence") type ObjectGetRes struct { @@ -604,157 +251,6 @@ func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) { res.setStatus(st) } -// GetObject receives object through NeoFS API call. -// -// Any client's internal or transport errors are returned as `error`. -// If WithNeoFSErrorParsing option has been provided, unsuccessful -// NeoFS status codes are returned as `error`, otherwise, are included -// in the returned result structure. -func (c *Client) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*ObjectGetRes, error) { - callOpts := c.defaultCallOptions() - - for i := range opts { - if opts[i] != nil { - opts[i](callOpts) - } - } - - // create request - req := new(v2object.GetRequest) - - // initialize request body - body := new(v2object.GetRequestBody) - req.SetBody(body) - - // set meta header - meta := v2MetaHeaderFromOpts(callOpts) - - if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ - addr: p.addr.ToV2(), - verb: v2session.ObjectVerbGet, - }); err != nil { - return nil, fmt.Errorf("could not attach session token: %w", err) - } - - req.SetMetaHeader(meta) - - // fill body fields - body.SetAddress(p.addr.ToV2()) - body.SetRaw(p.raw) - - // sign the request - if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, fmt.Errorf("signing the request failed: %w", err) - } - - // open stream - stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx)) - if err != nil { - return nil, fmt.Errorf("stream opening failed: %w", err) - } - - var ( - headWas bool - payload []byte - obj = new(v2object.Object) - resp = new(v2object.GetResponse) - - messageWas bool - - res = new(ObjectGetRes) - procPrm processResponseV2Prm - procRes processResponseV2Res - ) - - procPrm.callOpts = callOpts - procPrm.resp = resp - - procRes.statusRes = res - -loop: - for { - // receive message from server stream - err := stream.Read(resp) - if err != nil { - if errors.Is(err, io.EOF) { - if !messageWas { - return nil, errWrongMessageSeq - } - - break - } - - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - messageWas = true - - // process response in general - if c.processResponseV2(&procRes, procPrm) { - if procRes.cliErr != nil { - return nil, procRes.cliErr - } - - return res, nil - } - - switch v := resp.GetBody().GetObjectPart().(type) { - default: - return nil, errWrongMessageSeq - case *v2object.GetObjectPartInit: - if headWas { - return nil, errWrongMessageSeq - } - - headWas = true - - obj.SetObjectID(v.GetObjectID()) - obj.SetSignature(v.GetSignature()) - - hdr := v.GetHeader() - obj.SetHeader(hdr) - - if p.readerHandler != nil { - p.readerHandler(&objectPayloadReader{ - stream: stream, - }) - - break loop - } - - if p.w == nil { - payload = make([]byte, 0, hdr.GetPayloadLength()) - } - case *v2object.GetObjectPartChunk: - if !headWas { - return nil, errWrongMessageSeq - } - - if p.w != nil { - if _, err := p.w.Write(v.GetChunk()); err != nil { - return nil, fmt.Errorf("could not write payload chunk: %w", err) - } - } else { - payload = append(payload, v.GetChunk()...) - } - case *v2object.SplitInfo: - if headWas { - return nil, errWrongMessageSeq - } - - si := object.NewSplitInfoFromV2(v) - return nil, object.NewSplitInfoError(si) - } - } - - obj.SetPayload(payload) - - // convert the object - res.setObject(object.NewFromV2(obj)) - - return res, nil -} - func (p *ObjectHeaderParams) WithAddress(v *address.Address) *ObjectHeaderParams { if p != nil { p.addr = v diff --git a/client/object_get.go b/client/object_get.go new file mode 100644 index 0000000..504596b --- /dev/null +++ b/client/object_get.go @@ -0,0 +1,313 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/token" +) + +// PrmObjectGet groups parameters of ObjectGetInit operation. +type PrmObjectGet struct { + raw bool + + local bool + + sessionSet bool + session session.Token + + bearerSet bool + bearer token.BearerToken + + cnrSet bool + cnr cid.ID + + objSet bool + obj oid.ID +} + +// MarkRaw marks an intent to read physically stored object. +func (x *PrmObjectGet) MarkRaw() { + x.raw = true +} + +// MarkLocal tells the server to execute the operation locally. +func (x *PrmObjectGet) MarkLocal() { + x.local = true +} + +// WithinSession specifies session within which object should be read. +func (x *PrmObjectGet) WithinSession(t session.Token) { + x.session = t + x.sessionSet = true +} + +// WithBearerToken attaches bearer token to be used for the operation. +func (x *PrmObjectGet) WithBearerToken(t token.BearerToken) { + x.bearer = t + x.bearerSet = true +} + +// FromContainer specifies NeoFS container of the object. +// Required parameter. +func (x *PrmObjectGet) FromContainer(id cid.ID) { + x.cnr = id + x.cnrSet = true +} + +// ByID specifies identifier of the requested object. +// Required parameter. +func (x *PrmObjectGet) ByID(id oid.ID) { + x.obj = id + x.objSet = true +} + +// ResObjectGet groups the final result values of ObjectGetInit operation. +type ResObjectGet struct { + statusRes +} + +// ObjectReader is designed to read one object from NeoFS system. +// +// Must be initialized using Client.ObjectGetInit, any other +// usage is unsafe. +type ObjectReader struct { + cancelCtxStream context.CancelFunc + + ctxCall contextCall + + // initially bound to contextCall + bodyResp v2object.GetResponseBody + + tailPayload []byte +} + +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *ObjectReader) UseKey(key ecdsa.PrivateKey) { + x.ctxCall.key = key +} + +// ReadHeader reads header of the object. Result means success. +// Failure reason can be received via Close. +func (x *ObjectReader) ReadHeader(dst *object.Object) bool { + if !x.ctxCall.writeRequest() { + return false + } else if !x.ctxCall.readResponse() { + return false + } + + var partInit *v2object.GetObjectPartInit + + switch v := x.bodyResp.GetObjectPart().(type) { + default: + x.ctxCall.err = fmt.Errorf("unexpected message instead of heading part: %T", v) + return false + case *v2object.SplitInfo: + x.ctxCall.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) + return false + case *v2object.GetObjectPartInit: + partInit = v + } + + var objv2 v2object.Object + + objv2.SetObjectID(partInit.GetObjectID()) + objv2.SetHeader(partInit.GetHeader()) + objv2.SetSignature(partInit.GetSignature()) + + *dst = *object.NewFromV2(&objv2) // need smth better + + return true +} + +func (x *ObjectReader) readChunk(buf []byte) (int, bool) { + var read int + + // read remaining tail + read = copy(buf, x.tailPayload) + + x.tailPayload = x.tailPayload[read:] + + if len(buf) == read { + return read, true + } + + // receive next message + ok := x.ctxCall.readResponse() + if !ok { + return read, false + } + + // get chunk part message + part := x.bodyResp.GetObjectPart() + + var partChunk *v2object.GetObjectPartChunk + + partChunk, ok = part.(*v2object.GetObjectPartChunk) + if !ok { + x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) + return read, false + } + + // read new chunk + chunk := partChunk.GetChunk() + + tailOffset := copy(buf[read:], chunk) + + read += tailOffset + + // save the tail + x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...) + + return read, true +} + +// ReadChunk reads another chunk of the object payload. Works similar to +// io.Reader.Read but returns success flag instead of error. +// +// Failure reason can be received via Close. +func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) { + return x.readChunk(buf) +} + +func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) { + defer x.cancelCtxStream() + + if x.ctxCall.err != nil { + if !errors.Is(x.ctxCall.err, io.EOF) { + return nil, x.ctxCall.err + } else if !ignoreEOF { + return nil, io.EOF + } + } + + return x.ctxCall.statusRes.(*ResObjectGet), nil +} + +// Close ends reading the object and returns the result of the operation +// along with the final results. Must be called after using the ObjectReader. +// +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as Go built-in error. +// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures +// codes are returned as error. +// +// Return errors: +// *object.SplitInfoError (returned on virtual objects with PrmObjectGet.MakeRaw). +// +// Return statuses: +// global (see Client docs). +func (x *ObjectReader) Close() (*ResObjectGet, error) { + return x.close(true) +} + +func (x *ObjectReader) Read(p []byte) (int, error) { + n, ok := x.readChunk(p) + if !ok { + res, err := x.close(false) + if err != nil { + return n, err + } else if !x.ctxCall.resolveAPIFailures { + return n, apistatus.ErrFromStatus(res.Status()) + } + } + + return n, nil +} + +// ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol. +// +// The call only opens the transmission channel, explicit fetching is done using the ObjectWriter. +// Exactly one return value is non-nil. Resulting reader must be finally closed. +// +// Immediately panics if parameters are set incorrectly (see PrmObjectGet docs). +// Context is required and must not be nil. It is used for network communication. +func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) { + // check parameters + switch { + case ctx == nil: + panic(panicMsgMissingContext) + case !prm.cnrSet: + panic(panicMsgMissingContainer) + case !prm.objSet: + panic("missing object") + } + + var addr v2refs.Address + + addr.SetContainerID(prm.cnr.ToV2()) + addr.SetObjectID(prm.obj.ToV2()) + + // form request body + var body v2object.GetRequestBody + + body.SetRaw(prm.raw) + body.SetAddress(&addr) + + // form meta header + var meta v2session.RequestMetaHeader + + if prm.local { + meta.SetTTL(1) + } + + if prm.bearerSet { + meta.SetBearerToken(prm.bearer.ToV2()) + } + + if prm.sessionSet { + meta.SetSessionToken(prm.session.ToV2()) + } + + // form request + var req v2object.GetRequest + + req.SetBody(&body) + req.SetMetaHeader(&meta) + + // init reader + var ( + r ObjectReader + resp v2object.GetResponse + stream *rpcapi.GetResponseReader + ) + + ctx, r.cancelCtxStream = context.WithCancel(ctx) + + resp.SetBody(&r.bodyResp) + + // init call context + c.initCallContext(&r.ctxCall) + r.ctxCall.req = &req + r.ctxCall.statusRes = new(ResObjectGet) + r.ctxCall.resp = &resp + r.ctxCall.wReq = func() error { + var err error + + stream, err = rpcapi.GetObject(c.Raw(), &req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("open stream: %w", err) + } + + return nil + } + r.ctxCall.rResp = func() error { + return stream.Read(&resp) + } + + return &r, nil +} diff --git a/client/object_put.go b/client/object_put.go new file mode 100644 index 0000000..68a67d3 --- /dev/null +++ b/client/object_put.go @@ -0,0 +1,199 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "fmt" + + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/token" +) + +// PrmObjectPutInit groups parameters of ObjectPutInit operation. +// +// At the moment the operation is not parameterized, however, +// the structure is still declared for backward compatibility. +type PrmObjectPutInit struct{} + +// ResObjectPut groups the final result values of ObjectPutInit operation. +type ResObjectPut struct { + statusRes + + resp v2object.PutResponse +} + +// ReadStoredObject reads identifier of the saved object. +// Returns false if ID is missing (not read). +func (x *ResObjectPut) ReadStoredObject(id *oid.ID) bool { + idv2 := x.resp.GetBody().GetObjectID() + if idv2 == nil { + return false + } + + *id = *oid.NewIDFromV2(idv2) // need smth better + + return true +} + +// ObjectWriter is designed to write one object to NeoFS system. +// +// Must be initialized using Client.ObjectPutInit, any other +// usage is unsafe. +type ObjectWriter struct { + cancelCtxStream context.CancelFunc + + ctxCall contextCall + + // initially bound tp contextCall + metaHdr v2session.RequestMetaHeader + + // initially bound to contextCall + partInit v2object.PutObjectPartInit + + chunkCalled bool + + partChunk v2object.PutObjectPartChunk +} + +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *ObjectWriter) UseKey(key ecdsa.PrivateKey) { + x.ctxCall.key = key +} + +// WithBearerToken attaches bearer token to be used for the operation. +// Should be called once before any writing steps. +func (x *ObjectWriter) WithBearerToken(t token.BearerToken) { + x.metaHdr.SetBearerToken(t.ToV2()) +} + +// WithinSession specifies session within which object should be stored. +// Should be called once before any writing steps. +func (x *ObjectWriter) WithinSession(t session.Token) { + x.metaHdr.SetSessionToken(t.ToV2()) +} + +// MarkLocal tells the server to execute the operation locally. +func (x *ObjectWriter) MarkLocal() { + x.metaHdr.SetTTL(1) +} + +// WriteHeader writes header of the object. Result means success. +// Failure reason can be received via Close. +func (x *ObjectWriter) WriteHeader(hdr object.Object) bool { + v2Hdr := hdr.ToV2() + + x.partInit.SetObjectID(v2Hdr.GetObjectID()) + x.partInit.SetHeader(v2Hdr.GetHeader()) + x.partInit.SetSignature(v2Hdr.GetSignature()) + + return x.ctxCall.writeRequest() +} + +// WritePayloadChunk writes chunk of the object payload. Result means success. +// Failure reason can be received via Close. +func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { + if !x.chunkCalled { + x.chunkCalled = true + x.ctxCall.req.(*v2object.PutRequest).GetBody().SetObjectPart(&x.partChunk) + } + + for ln := len(chunk); ln > 0; ln = len(chunk) { + if ln > 512 { + ln = 512 + } + + x.partChunk.SetChunk(chunk[:ln]) + + if !x.ctxCall.writeRequest() { + return false + } + + chunk = chunk[ln:] + } + + return true +} + +// Close ends writing the object and returns the result of the operation +// along with the final results. Must be called after using the ObjectWriter. +// +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as Go built-in error. +// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures +// codes are returned as error. +// +// Return statuses: +// global (see Client docs). +func (x *ObjectWriter) Close() (*ResObjectPut, error) { + defer x.cancelCtxStream() + + if x.ctxCall.err != nil { + return nil, x.ctxCall.err + } + + if !x.ctxCall.close() { + return nil, x.ctxCall.err + } + + if !x.ctxCall.processResponse() { + return nil, x.ctxCall.err + } + + return x.ctxCall.statusRes.(*ResObjectPut), nil +} + +// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol. +// +// The call only opens the transmission channel, explicit recording is done using the ObjectWriter. +// Exactly one return value is non-nil. Resulting writer must be finally closed. +// +// Context is required and must not be nil. It is used for network communication. +func (c *Client) ObjectPutInit(ctx context.Context, _ PrmObjectPutInit) (*ObjectWriter, error) { + // check parameters + if ctx == nil { + panic(panicMsgMissingContext) + } + + // open stream + var ( + res ResObjectPut + w ObjectWriter + ) + + ctx, w.cancelCtxStream = context.WithCancel(ctx) + + stream, err := rpcapi.PutObject(c.Raw(), &res.resp, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + // form request body + var body v2object.PutRequestBody + + // form request + var req v2object.PutRequest + + req.SetBody(&body) + + req.SetMetaHeader(&w.metaHdr) + body.SetObjectPart(&w.partInit) + + // init call context + c.initCallContext(&w.ctxCall) + w.ctxCall.req = &req + w.ctxCall.statusRes = &res + w.ctxCall.resp = &res.resp + w.ctxCall.wReq = func() error { + return stream.Write(&req) + } + w.ctxCall.closer = stream.Close + + return &w, nil +} diff --git a/go.mod b/go.mod index 50dc751..81e062c 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.98.0 github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5 - github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.18.1 google.golang.org/grpc v1.41.0 diff --git a/pool/mock_test.go b/pool/mock_test.go index 7918531..af0b461 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -224,15 +224,11 @@ func (mr *MockClientMockRecorder) GetContainer(arg0, arg1 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContainer", reflect.TypeOf((*MockClient)(nil).GetContainer), varargs...) } -// GetObject mocks base method. -func (m *MockClient) GetObject(arg0 context.Context, arg1 *client0.GetObjectParams, arg2 ...client0.CallOption) (*client0.ObjectGetRes, error) { +// ObjectGetInitmocks base method. +func (m *MockClient) ObjectGetInit(arg0 context.Context, arg1 client0.PrmObjectGet) (*client0.ObjectReader, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetObject", varargs...) - ret0, _ := ret[0].(*client0.ObjectGetRes) + ret := m.ctrl.Call(m, "GetObject", arg0, arg1) + ret0, _ := ret[0].(*client0.ObjectReader) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -241,7 +237,7 @@ func (m *MockClient) GetObject(arg0 context.Context, arg1 *client0.GetObjectPara func (mr *MockClientMockRecorder) GetObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockClient)(nil).GetObject), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectGetInit", reflect.TypeOf((*MockClient)(nil).ObjectGetInit), varargs...) } // HashObjectPayloadRanges mocks base method. @@ -355,24 +351,20 @@ func (mr *MockClientMockRecorder) PutContainer(arg0, arg1 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutContainer", reflect.TypeOf((*MockClient)(nil).PutContainer), varargs...) } -// PutObject mocks base method. -func (m *MockClient) PutObject(arg0 context.Context, arg1 *client0.PutObjectParams, arg2 ...client0.CallOption) (*client0.ObjectPutRes, error) { +// ObjectPutInitmocks base method. +func (m *MockClient) ObjectPutInit(arg0 context.Context, arg1 client0.PrmObjectPutInit) (*client0.ObjectWriter, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "PutObject", varargs...) - ret0, _ := ret[0].(*client0.ObjectPutRes) + ret := m.ctrl.Call(m, "PutObject", arg0) + ret0, _ := ret[0].(*client0.ObjectWriter) ret1, _ := ret[1].(error) return ret0, ret1 } // PutObject indicates an expected call of PutObject. -func (mr *MockClientMockRecorder) PutObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) PutObject(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockClient)(nil).PutObject), varargs...) + varargs := append([]interface{}{arg0, arg1}) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectPutInit", reflect.TypeOf((*MockClient)(nil).ObjectPutInit), varargs...) } // Raw mocks base method. diff --git a/pool/pool.go b/pool/pool.go index 22a4b7d..3fe9924 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1,6 +1,7 @@ package pool import ( + "bytes" "context" "crypto/ecdsa" "crypto/sha256" @@ -22,6 +23,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/session" @@ -41,9 +43,9 @@ type Client interface { AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error) EndpointInfo(context.Context, client.EndpointInfoPrm) (*client.EndpointInfoRes, error) NetworkInfo(context.Context, client.NetworkInfoPrm) (*client.NetworkInfoRes, error) - PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error) + ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error) DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error) - GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error) + ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error) ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error) HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error) @@ -159,9 +161,9 @@ type Pool interface { } type Object interface { - PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) + PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error - GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) + GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) @@ -282,9 +284,9 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { for i, params := range options.nodesParams { clientPacks := make([]*clientPack, len(params.weights)) - for j, address := range params.addresses { + for j, addr := range params.addresses { c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), - client.WithURIAddress(address, nil), + client.WithURIAddress(addr, nil), client.WithDialTimeout(options.NodeConnectionTimeout), client.WithNeoFSErrorParsing()) if err != nil { @@ -294,14 +296,14 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) if err != nil && options.Logger != nil { options.Logger.Warn("failed to create neofs session token for client", - zap.String("address", address), + zap.String("address", addr), zap.Error(err)) } else if err == nil { healthy, atLeastOneHealthy = true, true st := sessionTokenForOwner(ownerID, cliRes) - _ = cache.Put(formCacheKey(address, options.Key), st) + _ = cache.Put(formCacheKey(addr, options.Key), st) } - clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address} + clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr} } source := rand.NewSource(time.Now().UnixNano()) sampler := NewSampler(params.weights, source) @@ -448,8 +450,8 @@ func (p *pool) Connection() (Client, *session.Token, error) { return nil, nil, err } - token := p.cache.Get(formCacheKey(cp.address, p.key)) - return cp.client, token, nil + tok := p.cache.Get(formCacheKey(cp.address, p.key)) + return cp.client, tok, nil } func (p *pool) connection() (*clientPack, error) { @@ -585,7 +587,68 @@ func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error { return nil } -func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) { +type callContext struct { + // context for RPC + ctxBase context.Context + + client Client + + // client endpoint + endpoint string + + // request signer + key *ecdsa.PrivateKey + + // flag to open default session if session token is missing + sessionDefault bool + sessionToken *session.Token +} + +func (p *pool) prepareCallContext(ctx *callContext, cfg *callConfig) error { + cp, err := p.connection() + if err != nil { + return err + } + + ctx.endpoint = cp.address + ctx.client = cp.client + + ctx.key = cfg.key + if ctx.key == nil { + ctx.key = p.key + } + + ctx.sessionDefault = cfg.useDefaultSession + ctx.sessionToken = cfg.stoken + + if ctx.sessionToken == nil && ctx.sessionDefault { + cacheKey := formCacheKey(ctx.endpoint, ctx.key) + + ctx.sessionToken = p.cache.Get(cacheKey) + if ctx.sessionToken == nil { + var cliPrm client.CreateSessionPrm + + cliPrm.SetExp(math.MaxUint32) + + cliRes, err := ctx.client.CreateSession(ctx.ctxBase, cliPrm) + if err != nil { + return fmt.Errorf("default session: %w", err) + } + + ctx.sessionToken = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes) + + _ = p.cache.Put(cacheKey, ctx.sessionToken) + } + } + + if ctx.sessionToken != nil && ctx.sessionToken.Signature() == nil { + err = ctx.sessionToken.Sign(ctx.key) + } + + return err +} + +func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) // Put object is different from other object service methods. Put request @@ -603,21 +666,90 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op return nil, err } - cp, options, err := p.conn(ctx, cfg) + var ctxCall callContext + + ctxCall.ctxBase = ctx + + err = p.prepareCallContext(&ctxCall, cfg) if err != nil { return nil, err } - res, err := cp.client.PutObject(ctx, params, options...) + var prm client.PrmObjectPutInit - // removes session token from cache in case of token error - _ = p.checkSessionTokenErr(err, cp.address) - - if err != nil { // here err already carries both status and client errors - return nil, err + wObj, err := ctxCall.client.ObjectPutInit(ctx, prm) + if err != nil { + return nil, fmt.Errorf("init writing on API client: %w", err) } - return res.ID(), nil + wObj.UseKey(*ctxCall.key) + + if ctxCall.sessionToken != nil { + wObj.WithinSession(*ctxCall.sessionToken) + } + + if cfg.btoken != nil { + wObj.WithBearerToken(*cfg.btoken) + } + + if wObj.WriteHeader(hdr) { + sz := hdr.PayloadSize() + + if data := hdr.Payload(); len(data) > 0 { + if payload != nil { + payload = io.MultiReader(bytes.NewReader(data), payload) + } else { + payload = bytes.NewReader(data) + sz = uint64(len(data)) + } + } + + if payload != nil { + const defaultBufferSizePut = 4096 // configure? + + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } + + buf := make([]byte, sz) + + var n int + var ok bool + + for { + n, err = payload.Read(buf) + if n > 0 { + ok = wObj.WritePayloadChunk(buf[:n]) + if !ok { + break + } + + continue + } + + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("read payload: %w", err) + } + } + } + + res, err := wObj.Close() + if err != nil { // here err already carries both status and client errors + // removes session token from cache in case of token error + p.checkSessionTokenErr(err, ctxCall.endpoint) + return nil, fmt.Errorf("client failure: %w", err) + } + + var id oid.ID + + if !res.ReadStoredObject(&id) { + return nil, errors.New("missing ID of the stored object") + } + + return &id, nil } func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error { @@ -639,25 +771,74 @@ func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectPara return err } -func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) { +type objectReadCloser client.ObjectReader + +func (x *objectReadCloser) Read(p []byte) (int, error) { + return (*client.ObjectReader)(x).Read(p) +} + +func (x *objectReadCloser) Close() error { + _, err := (*client.ObjectReader)(x).Close() + return err +} + +type ResGetObject struct { + Header object.Object + + Payload io.ReadCloser +} + +func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) - cp, options, err := p.conn(ctx, cfg) + + var ctxCall callContext + + ctxCall.ctxBase = ctx + + err := p.prepareCallContext(&ctxCall, cfg) if err != nil { return nil, err } - res, err := cp.client.GetObject(ctx, params, options...) + var prm client.PrmObjectGet - if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { - opts = append(opts, retry()) - return p.GetObject(ctx, params, opts...) + if cnr := addr.ContainerID(); cnr != nil { + prm.FromContainer(*cnr) } - if err != nil { // here err already carries both status and client errors + if obj := addr.ObjectID(); obj != nil { + prm.ByID(*obj) + } + + if ctxCall.sessionToken != nil { + prm.WithinSession(*ctxCall.sessionToken) + } + + if cfg.btoken != nil { + prm.WithBearerToken(*cfg.btoken) + } + + rObj, err := ctxCall.client.ObjectGetInit(ctx, prm) + if err != nil { return nil, err } - return res.Object(), nil + rObj.UseKey(*ctxCall.key) + + var res ResGetObject + + if !rObj.ReadHeader(&res.Header) { + _, err = rObj.Close() + if p.checkSessionTokenErr(err, ctxCall.endpoint) && !cfg.isRetry { + return p.GetObject(ctx, addr, append(opts, retry())...) + } + + return nil, fmt.Errorf("read header: %w", err) + } + + res.Payload = (*objectReadCloser)(rObj) + + return &res, nil } func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { diff --git a/pool/pool_test.go b/pool/pool_test.go index ba792c7..969bdde 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -15,6 +15,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/address" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/stretchr/testify/require" @@ -329,7 +331,7 @@ func TestSessionCache(t *testing.T) { }).MaxTimes(3) mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist")) - mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any()).Return(nil, nil) return mockClient, nil } @@ -355,7 +357,7 @@ func TestSessionCache(t *testing.T) { require.NoError(t, err) require.Contains(t, tokens, st) - _, err = pool.GetObject(ctx, nil, retry()) + _, err = pool.GetObject(ctx, address.Address{}, retry()) require.Error(t, err) // cache must not contain session token @@ -363,7 +365,7 @@ func TestSessionCache(t *testing.T) { require.NoError(t, err) require.Nil(t, st) - _, err = pool.PutObject(ctx, nil) + _, err = pool.PutObject(ctx, object.Object{}, nil) require.NoError(t, err) // cache must contain session token @@ -481,7 +483,7 @@ func TestSessionCacheWithKey(t *testing.T) { require.NoError(t, err) require.Contains(t, tokens, st) - _, err = pool.GetObject(ctx, nil, WithKey(newPrivateKey(t))) + _, err = pool.GetObject(ctx, address.Address{}, WithKey(newPrivateKey(t))) require.NoError(t, err) require.Len(t, tokens, 2) }