[#131] client: Change interface of object PUT and GET ops

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
remotes/fyrchik/fix-lint
Leonard Lyubich 2022-02-07 23:27:56 +03:00 committed by LeL
parent 40aaaafc73
commit 2624347d9b
8 changed files with 822 additions and 578 deletions

View File

@ -151,15 +151,26 @@ type contextCall struct {
req interface {
GetMetaHeader() *v2session.RequestMetaHeader
SetMetaHeader(*v2session.RequestMetaHeader)
SetVerificationHeader(*v2session.RequestVerificationHeader)
}
// function to send a request (unary) and receive a response
call func() (responseV2, error)
// function to send the request (req field)
wReq func() error
// function to recv the response (resp field)
rResp func() error
// function to close the message stream
closer func() error
// function of writing response fields to the resulting structure (optional)
result func(v2 responseV2)
}
// sets needed fields of the request meta header.
func (x contextCall) prepareRequest() {
meta := x.req.GetMetaHeader()
if meta == nil {
@ -178,6 +189,29 @@ func (x contextCall) prepareRequest() {
meta.SetNetworkMagic(x.netMagic)
}
// prepares, signs and writes the request. Result means success.
// If failed, contextCall.err contains the reason.
func (x *contextCall) writeRequest() bool {
x.prepareRequest()
x.req.SetVerificationHeader(nil)
// sign the request
x.err = signature.SignServiceMessage(&x.key, x.req)
if x.err != nil {
x.err = fmt.Errorf("sign request: %w", x.err)
return false
}
x.err = x.wReq()
if x.err != nil {
x.err = fmt.Errorf("write request: %w", x.err)
return false
}
return true
}
// performs common actions of response processing and writes any problem as a result status or client error
// (in both cases returns false).
//
@ -220,32 +254,32 @@ func (x *contextCall) processResponse() bool {
x.statusRes.setStatus(st)
return successfulStatus
return successfulStatus || !x.resolveAPIFailures
}
// goes through all stages of sending a request and processing a response. Returns true if successful.
func (x *contextCall) processCall() bool {
// prepare the request
x.prepareRequest()
// sign the request
x.err = signature.SignServiceMessage(&x.key, x.req)
if x.err != nil {
x.err = fmt.Errorf("sign request: %w", x.err)
return false
// reads response (if rResp is set) and processes it. Result means success.
// If failed, contextCall.err contains the reason.
func (x *contextCall) readResponse() bool {
if x.rResp != nil {
x.err = x.rResp()
if x.err != nil {
x.err = fmt.Errorf("read response: %w", x.err)
return false
}
}
// perform RPC
x.resp, x.err = x.call()
if x.err != nil {
x.err = fmt.Errorf("transport error: %w", x.err)
return false
}
return x.processResponse()
}
// process the response
ok := x.processResponse()
if !ok {
return false
// closes the message stream (if closer is set) and writes the results (if resuls is set).
// Return means success. If failed, contextCall.err contains the reason.
func (x *contextCall) close() bool {
if x.closer != nil {
x.err = x.closer()
if x.err != nil {
x.err = fmt.Errorf("close RPC: %w", x.err)
return false
}
}
// write response to resulting structure
@ -256,6 +290,34 @@ func (x *contextCall) processCall() bool {
return true
}
// goes through all stages of sending a request and processing a response. Returns true if successful.
// If failed, contextCall.err contains the reason.
func (x *contextCall) processCall() bool {
// set request writer
x.wReq = func() error {
var err error
x.resp, err = x.call()
return err
}
// write request
ok := x.writeRequest()
if !ok {
return false
}
// read response
ok = x.readResponse()
if !ok {
return false
}
// close and write response to resulting structure
ok = x.close()
return ok
}
// initializes static cross-call parameters inherited from client.
func (c *Client) initCallContext(ctx *contextCall) {
ctx.key = *c.opts.key

View File

@ -1,9 +1,7 @@
package client
import (
"bytes"
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
@ -22,12 +20,6 @@ import (
signer "github.com/nspcc-dev/neofs-sdk-go/util/signature"
)
type PutObjectParams struct {
obj *object.Object
r io.Reader
}
// ObjectAddressWriter is an interface of the
// component that writes the object address.
type ObjectAddressWriter interface {
@ -40,16 +32,6 @@ type DeleteObjectParams struct {
tombTgt ObjectAddressWriter
}
type GetObjectParams struct {
addr *address.Address
raw bool
w io.Writer
readerHandler ReaderHandler
}
type ObjectHeaderParams struct {
addr *address.Address
@ -84,20 +66,6 @@ type SearchObjectParams struct {
filters object.SearchFilters
}
type putObjectV2Reader struct {
r io.Reader
}
type putObjectV2Writer struct {
key *ecdsa.PrivateKey
chunkPart *v2object.PutObjectPartChunk
req *v2object.PutRequest
stream *rpcapi.PutRequestWriter
}
type checksumType int
const (
@ -106,8 +74,6 @@ const (
checksumTZ
)
const chunkSize = 3 * (1 << 20)
const TZSize = 64
const searchQueryVersion uint32 = 1
@ -133,199 +99,6 @@ func (t checksumType) toV2() v2refs.ChecksumType {
}
}
func (w *putObjectV2Reader) Read(p []byte) (int, error) {
return w.r.Read(p)
}
func (w *putObjectV2Writer) Write(p []byte) (int, error) {
w.chunkPart.SetChunk(p)
w.req.SetVerificationHeader(nil)
if err := signature.SignServiceMessage(w.key, w.req); err != nil {
return 0, fmt.Errorf("could not sign chunk request message: %w", err)
}
if err := w.stream.Write(w.req); err != nil {
return 0, fmt.Errorf("could not send chunk request message: %w", err)
}
return len(p), nil
}
func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams {
if p != nil {
p.obj = v
}
return p
}
func (p *PutObjectParams) Object() *object.Object {
if p != nil {
return p.obj
}
return nil
}
func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams {
if p != nil {
p.r = v
}
return p
}
func (p *PutObjectParams) PayloadReader() io.Reader {
if p != nil {
return p.r
}
return nil
}
type ObjectPutRes struct {
statusRes
id *oid.ID
}
func (x *ObjectPutRes) setID(id *oid.ID) {
x.id = id
}
func (x ObjectPutRes) ID() *oid.ID {
return x.id
}
// PutObject puts object through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*ObjectPutRes, error) {
callOpts := c.defaultCallOptions()
for i := range opts {
if opts[i] != nil {
opts[i](callOpts)
}
}
// create request
req := new(v2object.PutRequest)
// initialize request body
body := new(v2object.PutRequestBody)
req.SetBody(body)
v2Addr := new(v2refs.Address)
v2Addr.SetObjectID(p.obj.ID().ToV2())
v2Addr.SetContainerID(p.obj.ContainerID().ToV2())
// set meta header
meta := v2MetaHeaderFromOpts(callOpts)
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
addr: v2Addr,
verb: v2session.ObjectVerbPut,
}); err != nil {
return nil, fmt.Errorf("could not attach session token: %w", err)
}
req.SetMetaHeader(meta)
// initialize init part
initPart := new(v2object.PutObjectPartInit)
body.SetObjectPart(initPart)
obj := p.obj.ToV2()
// set init part fields
initPart.SetObjectID(obj.GetObjectID())
initPart.SetSignature(obj.GetSignature())
initPart.SetHeader(obj.GetHeader())
// sign the request
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
return nil, fmt.Errorf("signing the request failed: %w", err)
}
// open stream
resp := new(v2object.PutResponse)
stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
// send init part
err = stream.Write(req)
if err != nil {
return nil, fmt.Errorf("sending the initial message to stream failed: %w", err)
}
// create payload bytes reader
var rPayload io.Reader = bytes.NewReader(obj.GetPayload())
if p.r != nil {
rPayload = io.MultiReader(rPayload, p.r)
}
// create v2 payload stream writer
chunkPart := new(v2object.PutObjectPartChunk)
body.SetObjectPart(chunkPart)
w := &putObjectV2Writer{
key: callOpts.key,
chunkPart: chunkPart,
req: req,
stream: stream,
}
r := &putObjectV2Reader{r: rPayload}
// copy payload from reader to stream writer
_, err = io.CopyBuffer(w, r, make([]byte, chunkSize))
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("payload streaming failed: %w", err)
}
// close object stream and receive response from remote node
err = stream.Close()
if err != nil {
return nil, fmt.Errorf("closing the stream failed: %w", err)
}
var (
res = new(ObjectPutRes)
procPrm processResponseV2Prm
procRes processResponseV2Res
)
procPrm.callOpts = callOpts
procPrm.resp = resp
procRes.statusRes = res
// process response in general
if c.processResponseV2(&procRes, procPrm) {
if procRes.cliErr != nil {
return nil, procRes.cliErr
}
return res, nil
}
// convert object identifier
id := oid.NewIDFromV2(resp.GetBody().GetObjectID())
res.setID(id)
return res, nil
}
func (p *DeleteObjectParams) WithAddress(v *address.Address) *DeleteObjectParams {
if p != nil {
p.addr = v
@ -451,132 +224,6 @@ func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts .
return res, nil
}
func (p *GetObjectParams) WithAddress(v *address.Address) *GetObjectParams {
if p != nil {
p.addr = v
}
return p
}
func (p *GetObjectParams) Address() *address.Address {
if p != nil {
return p.addr
}
return nil
}
func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams {
if p != nil {
p.w = w
}
return p
}
func (p *GetObjectParams) PayloadWriter() io.Writer {
if p != nil {
return p.w
}
return nil
}
func (p *GetObjectParams) WithRawFlag(v bool) *GetObjectParams {
if p != nil {
p.raw = v
}
return p
}
func (p *GetObjectParams) RawFlag() bool {
if p != nil {
return p.raw
}
return false
}
// ReaderHandler is a function over io.Reader.
type ReaderHandler func(io.Reader)
// WithPayloadReaderHandler sets handler of the payload reader.
//
// If provided, payload reader is composed after receiving the header.
// In this case payload writer set via WithPayloadWriter is ignored.
//
// Handler should not be nil.
func (p *GetObjectParams) WithPayloadReaderHandler(f ReaderHandler) *GetObjectParams {
if p != nil {
p.readerHandler = f
}
return p
}
// wrapper over the Object Get stream that provides io.Reader.
type objectPayloadReader struct {
stream interface {
Read(*v2object.GetResponse) error
}
resp v2object.GetResponse
tail []byte
}
func (x *objectPayloadReader) Read(p []byte) (read int, err error) {
// read remaining tail
read = copy(p, x.tail)
x.tail = x.tail[read:]
if len(p)-read == 0 {
return
}
// receive message from server stream
err = x.stream.Read(&x.resp)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.EOF
return
}
err = fmt.Errorf("reading the response failed: %w", err)
return
}
// get chunk part message
part := x.resp.GetBody().GetObjectPart()
chunkPart, ok := part.(*v2object.GetObjectPartChunk)
if !ok {
err = errWrongMessageSeq
return
}
// verify response structure
if err = signature.VerifyServiceMessage(&x.resp); err != nil {
err = fmt.Errorf("response verification failed: %w", err)
return
}
// read new chunk
chunk := chunkPart.GetChunk()
tailOffset := copy(p[read:], chunk)
read += tailOffset
// save the tail
x.tail = append(x.tail, chunk[tailOffset:]...)
return
}
var errWrongMessageSeq = errors.New("incorrect message sequence")
type ObjectGetRes struct {
@ -604,157 +251,6 @@ func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) {
res.setStatus(st)
}
// GetObject receives object through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*ObjectGetRes, error) {
callOpts := c.defaultCallOptions()
for i := range opts {
if opts[i] != nil {
opts[i](callOpts)
}
}
// create request
req := new(v2object.GetRequest)
// initialize request body
body := new(v2object.GetRequestBody)
req.SetBody(body)
// set meta header
meta := v2MetaHeaderFromOpts(callOpts)
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
addr: p.addr.ToV2(),
verb: v2session.ObjectVerbGet,
}); err != nil {
return nil, fmt.Errorf("could not attach session token: %w", err)
}
req.SetMetaHeader(meta)
// fill body fields
body.SetAddress(p.addr.ToV2())
body.SetRaw(p.raw)
// sign the request
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
return nil, fmt.Errorf("signing the request failed: %w", err)
}
// open stream
stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
var (
headWas bool
payload []byte
obj = new(v2object.Object)
resp = new(v2object.GetResponse)
messageWas bool
res = new(ObjectGetRes)
procPrm processResponseV2Prm
procRes processResponseV2Res
)
procPrm.callOpts = callOpts
procPrm.resp = resp
procRes.statusRes = res
loop:
for {
// receive message from server stream
err := stream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !messageWas {
return nil, errWrongMessageSeq
}
break
}
return nil, fmt.Errorf("reading the response failed: %w", err)
}
messageWas = true
// process response in general
if c.processResponseV2(&procRes, procPrm) {
if procRes.cliErr != nil {
return nil, procRes.cliErr
}
return res, nil
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return nil, errWrongMessageSeq
case *v2object.GetObjectPartInit:
if headWas {
return nil, errWrongMessageSeq
}
headWas = true
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
hdr := v.GetHeader()
obj.SetHeader(hdr)
if p.readerHandler != nil {
p.readerHandler(&objectPayloadReader{
stream: stream,
})
break loop
}
if p.w == nil {
payload = make([]byte, 0, hdr.GetPayloadLength())
}
case *v2object.GetObjectPartChunk:
if !headWas {
return nil, errWrongMessageSeq
}
if p.w != nil {
if _, err := p.w.Write(v.GetChunk()); err != nil {
return nil, fmt.Errorf("could not write payload chunk: %w", err)
}
} else {
payload = append(payload, v.GetChunk()...)
}
case *v2object.SplitInfo:
if headWas {
return nil, errWrongMessageSeq
}
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
obj.SetPayload(payload)
// convert the object
res.setObject(object.NewFromV2(obj))
return res, nil
}
func (p *ObjectHeaderParams) WithAddress(v *address.Address) *ObjectHeaderParams {
if p != nil {
p.addr = v

View File

@ -0,0 +1,313 @@
package client
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/token"
)
// PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct {
raw bool
local bool
sessionSet bool
session session.Token
bearerSet bool
bearer token.BearerToken
cnrSet bool
cnr cid.ID
objSet bool
obj oid.ID
}
// MarkRaw marks an intent to read physically stored object.
func (x *PrmObjectGet) MarkRaw() {
x.raw = true
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectGet) MarkLocal() {
x.local = true
}
// WithinSession specifies session within which object should be read.
func (x *PrmObjectGet) WithinSession(t session.Token) {
x.session = t
x.sessionSet = true
}
// WithBearerToken attaches bearer token to be used for the operation.
func (x *PrmObjectGet) WithBearerToken(t token.BearerToken) {
x.bearer = t
x.bearerSet = true
}
// FromContainer specifies NeoFS container of the object.
// Required parameter.
func (x *PrmObjectGet) FromContainer(id cid.ID) {
x.cnr = id
x.cnrSet = true
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectGet) ByID(id oid.ID) {
x.obj = id
x.objSet = true
}
// ResObjectGet groups the final result values of ObjectGetInit operation.
type ResObjectGet struct {
statusRes
}
// ObjectReader is designed to read one object from NeoFS system.
//
// Must be initialized using Client.ObjectGetInit, any other
// usage is unsafe.
type ObjectReader struct {
cancelCtxStream context.CancelFunc
ctxCall contextCall
// initially bound to contextCall
bodyResp v2object.GetResponseBody
tailPayload []byte
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *ObjectReader) UseKey(key ecdsa.PrivateKey) {
x.ctxCall.key = key
}
// ReadHeader reads header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectReader) ReadHeader(dst *object.Object) bool {
if !x.ctxCall.writeRequest() {
return false
} else if !x.ctxCall.readResponse() {
return false
}
var partInit *v2object.GetObjectPartInit
switch v := x.bodyResp.GetObjectPart().(type) {
default:
x.ctxCall.err = fmt.Errorf("unexpected message instead of heading part: %T", v)
return false
case *v2object.SplitInfo:
x.ctxCall.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v))
return false
case *v2object.GetObjectPartInit:
partInit = v
}
var objv2 v2object.Object
objv2.SetObjectID(partInit.GetObjectID())
objv2.SetHeader(partInit.GetHeader())
objv2.SetSignature(partInit.GetSignature())
*dst = *object.NewFromV2(&objv2) // need smth better
return true
}
func (x *ObjectReader) readChunk(buf []byte) (int, bool) {
var read int
// read remaining tail
read = copy(buf, x.tailPayload)
x.tailPayload = x.tailPayload[read:]
if len(buf) == read {
return read, true
}
// receive next message
ok := x.ctxCall.readResponse()
if !ok {
return read, false
}
// get chunk part message
part := x.bodyResp.GetObjectPart()
var partChunk *v2object.GetObjectPartChunk
partChunk, ok = part.(*v2object.GetObjectPartChunk)
if !ok {
x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part)
return read, false
}
// read new chunk
chunk := partChunk.GetChunk()
tailOffset := copy(buf[read:], chunk)
read += tailOffset
// save the tail
x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...)
return read, true
}
// ReadChunk reads another chunk of the object payload. Works similar to
// io.Reader.Read but returns success flag instead of error.
//
// Failure reason can be received via Close.
func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) {
return x.readChunk(buf)
}
func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) {
defer x.cancelCtxStream()
if x.ctxCall.err != nil {
if !errors.Is(x.ctxCall.err, io.EOF) {
return nil, x.ctxCall.err
} else if !ignoreEOF {
return nil, io.EOF
}
}
return x.ctxCall.statusRes.(*ResObjectGet), nil
}
// Close ends reading the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectReader.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures
// codes are returned as error.
//
// Return errors:
// *object.SplitInfoError (returned on virtual objects with PrmObjectGet.MakeRaw).
//
// Return statuses:
// global (see Client docs).
func (x *ObjectReader) Close() (*ResObjectGet, error) {
return x.close(true)
}
func (x *ObjectReader) Read(p []byte) (int, error) {
n, ok := x.readChunk(p)
if !ok {
res, err := x.close(false)
if err != nil {
return n, err
} else if !x.ctxCall.resolveAPIFailures {
return n, apistatus.ErrFromStatus(res.Status())
}
}
return n, nil
}
// ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol.
//
// The call only opens the transmission channel, explicit fetching is done using the ObjectWriter.
// Exactly one return value is non-nil. Resulting reader must be finally closed.
//
// Immediately panics if parameters are set incorrectly (see PrmObjectGet docs).
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
// check parameters
switch {
case ctx == nil:
panic(panicMsgMissingContext)
case !prm.cnrSet:
panic(panicMsgMissingContainer)
case !prm.objSet:
panic("missing object")
}
var addr v2refs.Address
addr.SetContainerID(prm.cnr.ToV2())
addr.SetObjectID(prm.obj.ToV2())
// form request body
var body v2object.GetRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&addr)
// form meta header
var meta v2session.RequestMetaHeader
if prm.local {
meta.SetTTL(1)
}
if prm.bearerSet {
meta.SetBearerToken(prm.bearer.ToV2())
}
if prm.sessionSet {
meta.SetSessionToken(prm.session.ToV2())
}
// form request
var req v2object.GetRequest
req.SetBody(&body)
req.SetMetaHeader(&meta)
// init reader
var (
r ObjectReader
resp v2object.GetResponse
stream *rpcapi.GetResponseReader
)
ctx, r.cancelCtxStream = context.WithCancel(ctx)
resp.SetBody(&r.bodyResp)
// init call context
c.initCallContext(&r.ctxCall)
r.ctxCall.req = &req
r.ctxCall.statusRes = new(ResObjectGet)
r.ctxCall.resp = &resp
r.ctxCall.wReq = func() error {
var err error
stream, err = rpcapi.GetObject(c.Raw(), &req, client.WithContext(ctx))
if err != nil {
return fmt.Errorf("open stream: %w", err)
}
return nil
}
r.ctxCall.rResp = func() error {
return stream.Read(&resp)
}
return &r, nil
}

View File

@ -0,0 +1,199 @@
package client
import (
"context"
"crypto/ecdsa"
"fmt"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/token"
)
// PrmObjectPutInit groups parameters of ObjectPutInit operation.
//
// At the moment the operation is not parameterized, however,
// the structure is still declared for backward compatibility.
type PrmObjectPutInit struct{}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
statusRes
resp v2object.PutResponse
}
// ReadStoredObject reads identifier of the saved object.
// Returns false if ID is missing (not read).
func (x *ResObjectPut) ReadStoredObject(id *oid.ID) bool {
idv2 := x.resp.GetBody().GetObjectID()
if idv2 == nil {
return false
}
*id = *oid.NewIDFromV2(idv2) // need smth better
return true
}
// ObjectWriter is designed to write one object to NeoFS system.
//
// Must be initialized using Client.ObjectPutInit, any other
// usage is unsafe.
type ObjectWriter struct {
cancelCtxStream context.CancelFunc
ctxCall contextCall
// initially bound tp contextCall
metaHdr v2session.RequestMetaHeader
// initially bound to contextCall
partInit v2object.PutObjectPartInit
chunkCalled bool
partChunk v2object.PutObjectPartChunk
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *ObjectWriter) UseKey(key ecdsa.PrivateKey) {
x.ctxCall.key = key
}
// WithBearerToken attaches bearer token to be used for the operation.
// Should be called once before any writing steps.
func (x *ObjectWriter) WithBearerToken(t token.BearerToken) {
x.metaHdr.SetBearerToken(t.ToV2())
}
// WithinSession specifies session within which object should be stored.
// Should be called once before any writing steps.
func (x *ObjectWriter) WithinSession(t session.Token) {
x.metaHdr.SetSessionToken(t.ToV2())
}
// MarkLocal tells the server to execute the operation locally.
func (x *ObjectWriter) MarkLocal() {
x.metaHdr.SetTTL(1)
}
// WriteHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
v2Hdr := hdr.ToV2()
x.partInit.SetObjectID(v2Hdr.GetObjectID())
x.partInit.SetHeader(v2Hdr.GetHeader())
x.partInit.SetSignature(v2Hdr.GetSignature())
return x.ctxCall.writeRequest()
}
// WritePayloadChunk writes chunk of the object payload. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
if !x.chunkCalled {
x.chunkCalled = true
x.ctxCall.req.(*v2object.PutRequest).GetBody().SetObjectPart(&x.partChunk)
}
for ln := len(chunk); ln > 0; ln = len(chunk) {
if ln > 512 {
ln = 512
}
x.partChunk.SetChunk(chunk[:ln])
if !x.ctxCall.writeRequest() {
return false
}
chunk = chunk[ln:]
}
return true
}
// Close ends writing the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectWriter.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures
// codes are returned as error.
//
// Return statuses:
// global (see Client docs).
func (x *ObjectWriter) Close() (*ResObjectPut, error) {
defer x.cancelCtxStream()
if x.ctxCall.err != nil {
return nil, x.ctxCall.err
}
if !x.ctxCall.close() {
return nil, x.ctxCall.err
}
if !x.ctxCall.processResponse() {
return nil, x.ctxCall.err
}
return x.ctxCall.statusRes.(*ResObjectPut), nil
}
// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.
//
// The call only opens the transmission channel, explicit recording is done using the ObjectWriter.
// Exactly one return value is non-nil. Resulting writer must be finally closed.
//
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectPutInit(ctx context.Context, _ PrmObjectPutInit) (*ObjectWriter, error) {
// check parameters
if ctx == nil {
panic(panicMsgMissingContext)
}
// open stream
var (
res ResObjectPut
w ObjectWriter
)
ctx, w.cancelCtxStream = context.WithCancel(ctx)
stream, err := rpcapi.PutObject(c.Raw(), &res.resp, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("open stream: %w", err)
}
// form request body
var body v2object.PutRequestBody
// form request
var req v2object.PutRequest
req.SetBody(&body)
req.SetMetaHeader(&w.metaHdr)
body.SetObjectPart(&w.partInit)
// init call context
c.initCallContext(&w.ctxCall)
w.ctxCall.req = &req
w.ctxCall.statusRes = &res
w.ctxCall.resp = &res.resp
w.ctxCall.wReq = func() error {
return stream.Write(&req)
}
w.ctxCall.closer = stream.Close
return &w, nil
}

1
go.mod
View File

@ -11,7 +11,6 @@ require (
github.com/nspcc-dev/hrw v1.0.9
github.com/nspcc-dev/neo-go v0.98.0
github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5
github.com/nspcc-dev/neofs-crypto v0.3.0
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.18.1
google.golang.org/grpc v1.41.0

View File

@ -224,15 +224,11 @@ func (mr *MockClientMockRecorder) GetContainer(arg0, arg1 interface{}) *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContainer", reflect.TypeOf((*MockClient)(nil).GetContainer), varargs...)
}
// GetObject mocks base method.
func (m *MockClient) GetObject(arg0 context.Context, arg1 *client0.GetObjectParams, arg2 ...client0.CallOption) (*client0.ObjectGetRes, error) {
// ObjectGetInitmocks base method.
func (m *MockClient) ObjectGetInit(arg0 context.Context, arg1 client0.PrmObjectGet) (*client0.ObjectReader, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "GetObject", varargs...)
ret0, _ := ret[0].(*client0.ObjectGetRes)
ret := m.ctrl.Call(m, "GetObject", arg0, arg1)
ret0, _ := ret[0].(*client0.ObjectReader)
ret1, _ := ret[1].(error)
return ret0, ret1
}
@ -241,7 +237,7 @@ func (m *MockClient) GetObject(arg0 context.Context, arg1 *client0.GetObjectPara
func (mr *MockClientMockRecorder) GetObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockClient)(nil).GetObject), varargs...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectGetInit", reflect.TypeOf((*MockClient)(nil).ObjectGetInit), varargs...)
}
// HashObjectPayloadRanges mocks base method.
@ -355,24 +351,20 @@ func (mr *MockClientMockRecorder) PutContainer(arg0, arg1 interface{}) *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutContainer", reflect.TypeOf((*MockClient)(nil).PutContainer), varargs...)
}
// PutObject mocks base method.
func (m *MockClient) PutObject(arg0 context.Context, arg1 *client0.PutObjectParams, arg2 ...client0.CallOption) (*client0.ObjectPutRes, error) {
// ObjectPutInitmocks base method.
func (m *MockClient) ObjectPutInit(arg0 context.Context, arg1 client0.PrmObjectPutInit) (*client0.ObjectWriter, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "PutObject", varargs...)
ret0, _ := ret[0].(*client0.ObjectPutRes)
ret := m.ctrl.Call(m, "PutObject", arg0)
ret0, _ := ret[0].(*client0.ObjectWriter)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// PutObject indicates an expected call of PutObject.
func (mr *MockClientMockRecorder) PutObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
func (mr *MockClientMockRecorder) PutObject(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockClient)(nil).PutObject), varargs...)
varargs := append([]interface{}{arg0, arg1})
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectPutInit", reflect.TypeOf((*MockClient)(nil).ObjectPutInit), varargs...)
}
// Raw mocks base method.

View File

@ -1,6 +1,7 @@
package pool
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
@ -22,6 +23,7 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/session"
@ -41,9 +43,9 @@ type Client interface {
AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error)
EndpointInfo(context.Context, client.EndpointInfoPrm) (*client.EndpointInfoRes, error)
NetworkInfo(context.Context, client.NetworkInfoPrm) (*client.NetworkInfoRes, error)
PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error)
ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error)
DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error)
GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error)
ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error)
HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error)
ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error)
HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error)
@ -159,9 +161,9 @@ type Pool interface {
}
type Object interface {
PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error)
PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error)
DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error
GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error)
GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error)
GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error)
ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error)
ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error)
@ -282,9 +284,9 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
for i, params := range options.nodesParams {
clientPacks := make([]*clientPack, len(params.weights))
for j, address := range params.addresses {
for j, addr := range params.addresses {
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
client.WithURIAddress(address, nil),
client.WithURIAddress(addr, nil),
client.WithDialTimeout(options.NodeConnectionTimeout),
client.WithNeoFSErrorParsing())
if err != nil {
@ -294,14 +296,14 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration)
if err != nil && options.Logger != nil {
options.Logger.Warn("failed to create neofs session token for client",
zap.String("address", address),
zap.String("address", addr),
zap.Error(err))
} else if err == nil {
healthy, atLeastOneHealthy = true, true
st := sessionTokenForOwner(ownerID, cliRes)
_ = cache.Put(formCacheKey(address, options.Key), st)
_ = cache.Put(formCacheKey(addr, options.Key), st)
}
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address}
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
}
source := rand.NewSource(time.Now().UnixNano())
sampler := NewSampler(params.weights, source)
@ -448,8 +450,8 @@ func (p *pool) Connection() (Client, *session.Token, error) {
return nil, nil, err
}
token := p.cache.Get(formCacheKey(cp.address, p.key))
return cp.client, token, nil
tok := p.cache.Get(formCacheKey(cp.address, p.key))
return cp.client, tok, nil
}
func (p *pool) connection() (*clientPack, error) {
@ -585,7 +587,68 @@ func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error {
return nil
}
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) {
type callContext struct {
// context for RPC
ctxBase context.Context
client Client
// client endpoint
endpoint string
// request signer
key *ecdsa.PrivateKey
// flag to open default session if session token is missing
sessionDefault bool
sessionToken *session.Token
}
func (p *pool) prepareCallContext(ctx *callContext, cfg *callConfig) error {
cp, err := p.connection()
if err != nil {
return err
}
ctx.endpoint = cp.address
ctx.client = cp.client
ctx.key = cfg.key
if ctx.key == nil {
ctx.key = p.key
}
ctx.sessionDefault = cfg.useDefaultSession
ctx.sessionToken = cfg.stoken
if ctx.sessionToken == nil && ctx.sessionDefault {
cacheKey := formCacheKey(ctx.endpoint, ctx.key)
ctx.sessionToken = p.cache.Get(cacheKey)
if ctx.sessionToken == nil {
var cliPrm client.CreateSessionPrm
cliPrm.SetExp(math.MaxUint32)
cliRes, err := ctx.client.CreateSession(ctx.ctxBase, cliPrm)
if err != nil {
return fmt.Errorf("default session: %w", err)
}
ctx.sessionToken = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes)
_ = p.cache.Put(cacheKey, ctx.sessionToken)
}
}
if ctx.sessionToken != nil && ctx.sessionToken.Signature() == nil {
err = ctx.sessionToken.Sign(ctx.key)
}
return err
}
func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
// Put object is different from other object service methods. Put request
@ -603,21 +666,90 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op
return nil, err
}
cp, options, err := p.conn(ctx, cfg)
var ctxCall callContext
ctxCall.ctxBase = ctx
err = p.prepareCallContext(&ctxCall, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.PutObject(ctx, params, options...)
var prm client.PrmObjectPutInit
// removes session token from cache in case of token error
_ = p.checkSessionTokenErr(err, cp.address)
if err != nil { // here err already carries both status and client errors
return nil, err
wObj, err := ctxCall.client.ObjectPutInit(ctx, prm)
if err != nil {
return nil, fmt.Errorf("init writing on API client: %w", err)
}
return res.ID(), nil
wObj.UseKey(*ctxCall.key)
if ctxCall.sessionToken != nil {
wObj.WithinSession(*ctxCall.sessionToken)
}
if cfg.btoken != nil {
wObj.WithBearerToken(*cfg.btoken)
}
if wObj.WriteHeader(hdr) {
sz := hdr.PayloadSize()
if data := hdr.Payload(); len(data) > 0 {
if payload != nil {
payload = io.MultiReader(bytes.NewReader(data), payload)
} else {
payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if payload != nil {
const defaultBufferSizePut = 4096 // configure?
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
var n int
var ok bool
for {
n, err = payload.Read(buf)
if n > 0 {
ok = wObj.WritePayloadChunk(buf[:n])
if !ok {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("read payload: %w", err)
}
}
}
res, err := wObj.Close()
if err != nil { // here err already carries both status and client errors
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
return nil, fmt.Errorf("client failure: %w", err)
}
var id oid.ID
if !res.ReadStoredObject(&id) {
return nil, errors.New("missing ID of the stored object")
}
return &id, nil
}
func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
@ -639,25 +771,74 @@ func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectPara
return err
}
func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) {
type objectReadCloser client.ObjectReader
func (x *objectReadCloser) Read(p []byte) (int, error) {
return (*client.ObjectReader)(x).Read(p)
}
func (x *objectReadCloser) Close() error {
_, err := (*client.ObjectReader)(x).Close()
return err
}
type ResGetObject struct {
Header object.Object
Payload io.ReadCloser
}
func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
var ctxCall callContext
ctxCall.ctxBase = ctx
err := p.prepareCallContext(&ctxCall, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.GetObject(ctx, params, options...)
var prm client.PrmObjectGet
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetObject(ctx, params, opts...)
if cnr := addr.ContainerID(); cnr != nil {
prm.FromContainer(*cnr)
}
if err != nil { // here err already carries both status and client errors
if obj := addr.ObjectID(); obj != nil {
prm.ByID(*obj)
}
if ctxCall.sessionToken != nil {
prm.WithinSession(*ctxCall.sessionToken)
}
if cfg.btoken != nil {
prm.WithBearerToken(*cfg.btoken)
}
rObj, err := ctxCall.client.ObjectGetInit(ctx, prm)
if err != nil {
return nil, err
}
return res.Object(), nil
rObj.UseKey(*ctxCall.key)
var res ResGetObject
if !rObj.ReadHeader(&res.Header) {
_, err = rObj.Close()
if p.checkSessionTokenErr(err, ctxCall.endpoint) && !cfg.isRetry {
return p.GetObject(ctx, addr, append(opts, retry())...)
}
return nil, fmt.Errorf("read header: %w", err)
}
res.Payload = (*objectReadCloser)(rObj)
return &res, nil
}
func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {

View File

@ -15,6 +15,8 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/stretchr/testify/require"
@ -329,7 +331,7 @@ func TestSessionCache(t *testing.T) {
}).MaxTimes(3)
mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist"))
mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any()).Return(nil, nil)
return mockClient, nil
}
@ -355,7 +357,7 @@ func TestSessionCache(t *testing.T) {
require.NoError(t, err)
require.Contains(t, tokens, st)
_, err = pool.GetObject(ctx, nil, retry())
_, err = pool.GetObject(ctx, address.Address{}, retry())
require.Error(t, err)
// cache must not contain session token
@ -363,7 +365,7 @@ func TestSessionCache(t *testing.T) {
require.NoError(t, err)
require.Nil(t, st)
_, err = pool.PutObject(ctx, nil)
_, err = pool.PutObject(ctx, object.Object{}, nil)
require.NoError(t, err)
// cache must contain session token
@ -481,7 +483,7 @@ func TestSessionCacheWithKey(t *testing.T) {
require.NoError(t, err)
require.Contains(t, tokens, st)
_, err = pool.GetObject(ctx, nil, WithKey(newPrivateKey(t)))
_, err = pool.GetObject(ctx, address.Address{}, WithKey(newPrivateKey(t)))
require.NoError(t, err)
require.Len(t, tokens, 2)
}