[#323] client: Refactor object.Put

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-08-25 15:14:18 +03:00 committed by fyrchik
parent 724d30db1a
commit d6d6a41f5d
2 changed files with 90 additions and 89 deletions

View file

@ -12,7 +12,9 @@ import (
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-api-go/v2/signature"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
@ -21,6 +23,8 @@ import (
// PrmObjectPutInit groups parameters of ObjectPutInit operation.
type PrmObjectPutInit struct {
copyNum uint32
key *ecdsa.PrivateKey
meta v2session.RequestMetaHeader
}
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
@ -47,57 +51,62 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
type ObjectWriter struct {
cancelCtxStream context.CancelFunc
ctxCall contextCall
client *Client
stream interface {
Write(*v2object.PutRequest) error
Close() error
}
// initially bound tp contextCall
metaHdr v2session.RequestMetaHeader
// initially bound to contextCall
partInit v2object.PutObjectPartInit
key *ecdsa.PrivateKey
res ResObjectPut
err error
chunkCalled bool
respV2 v2object.PutResponse
req v2object.PutRequest
partInit v2object.PutObjectPartInit
partChunk v2object.PutObjectPartChunk
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *ObjectWriter) UseKey(key ecdsa.PrivateKey) {
x.ctxCall.key = key
func (x *PrmObjectPutInit) UseKey(key ecdsa.PrivateKey) {
x.key = &key
}
// WithBearerToken attaches bearer token to be used for the operation.
// Should be called once before any writing steps.
func (x *ObjectWriter) WithBearerToken(t bearer.Token) {
func (x *PrmObjectPutInit) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.metaHdr.SetBearerToken(&v2token)
x.meta.SetBearerToken(&v2token)
}
// WithinSession specifies session within which object should be stored.
// Should be called once before any writing steps.
func (x *ObjectWriter) WithinSession(t session.Object) {
func (x *PrmObjectPutInit) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.metaHdr.SetSessionToken(&tv2)
x.meta.SetSessionToken(&tv2)
}
// MarkLocal tells the server to execute the operation locally.
func (x *ObjectWriter) MarkLocal() {
x.metaHdr.SetTTL(1)
func (x *PrmObjectPutInit) MarkLocal() {
x.meta.SetTTL(1)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *ObjectWriter) WithXHeaders(hs ...string) {
func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
if len(hs)%2 != 0 {
panic("slice of X-Headers with odd length")
}
writeXHeadersToMeta(hs, &x.metaHdr)
writeXHeadersToMeta(hs, &x.meta)
}
// WriteHeader writes header of the object. Result means success.
@ -109,7 +118,17 @@ func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
x.partInit.SetHeader(v2Hdr.GetHeader())
x.partInit.SetSignature(v2Hdr.GetSignature())
return x.ctxCall.writeRequest()
x.req.GetBody().SetObjectPart(&x.partInit)
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
return x.err == nil
}
// WritePayloadChunk writes chunk of the object payload. Result means success.
@ -117,7 +136,7 @@ func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
if !x.chunkCalled {
x.chunkCalled = true
x.ctxCall.req.(*v2object.PutRequest).GetBody().SetObjectPart(&x.partChunk)
x.req.GetBody().SetObjectPart(&x.partChunk)
}
for ln := len(chunk); ln > 0; ln = len(chunk) {
@ -142,8 +161,16 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
// It is mentally assumed that allocating and filling the buffer is better than
// synchronous sending, but this needs to be tested.
x.partChunk.SetChunk(chunk[:ln])
x.req.SetVerificationHeader(nil)
if !x.ctxCall.writeRequest() {
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
if x.err != nil {
return false
}
@ -175,28 +202,36 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
// Ignore io.EOF error, because it is expected error for client-side
// stream termination by the server. E.g. when stream contains invalid
// message. Server returns an error in response message (in status).
if x.ctxCall.err != nil && !errors.Is(x.ctxCall.err, io.EOF) {
return nil, x.ctxCall.err
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
}
if x.ctxCall.err = x.ctxCall.closer(); x.ctxCall.err != nil {
return nil, x.ctxCall.err
if x.err = x.stream.Close(); x.err != nil {
return nil, x.err
}
x.ctxCall.processResponse()
if x.ctxCall.err != nil {
return nil, x.ctxCall.err
x.res.st, x.err = x.client.processResponse(&x.respV2)
if x.err != nil {
return nil, x.err
}
if x.ctxCall.result != nil {
x.ctxCall.result(x.ctxCall.resp)
if x.ctxCall.err != nil {
return nil, x.ctxCall.err
}
if !apistatus.IsSuccessful(x.res.st) {
return &x.res, nil
}
return x.ctxCall.statusRes.(*ResObjectPut), nil
const fieldID = "ID"
idV2 := x.respV2.GetBody().GetObjectID()
if idV2 == nil {
return nil, newErrMissingResponseField(fieldID)
}
x.err = x.res.obj.ReadFromV2(*idV2)
if x.err != nil {
x.err = newErrInvalidResponseField(fieldID, x.err)
}
return &x.res, nil
}
// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.
@ -211,57 +246,25 @@ func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*Obje
panic(panicMsgMissingContext)
}
// open stream
var (
res ResObjectPut
w ObjectWriter
var w ObjectWriter
resp v2object.PutResponse
)
ctx, w.cancelCtxStream = context.WithCancel(ctx)
w.partInit.SetCopiesNumber(prm.copyNum)
stream, err := rpcapi.PutObject(&c.c, &resp, client.WithContext(ctx))
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
}
// form request body
var body v2object.PutRequestBody
// form request
var req v2object.PutRequest
req.SetBody(&body)
req.SetMetaHeader(&w.metaHdr)
body.SetObjectPart(&w.partInit)
// init call context
c.initCallContext(&w.ctxCall)
w.ctxCall.req = &req
w.ctxCall.statusRes = &res
w.ctxCall.resp = &resp
w.ctxCall.wReq = func() error {
return stream.Write(&req)
}
w.ctxCall.closer = stream.Close
w.ctxCall.result = func(r responseV2) {
const fieldID = "ID"
idV2 := r.(*v2object.PutResponse).GetBody().GetObjectID()
if idV2 == nil {
w.ctxCall.err = newErrMissingResponseField(fieldID)
return
}
w.ctxCall.err = res.obj.ReadFromV2(*idV2)
if w.ctxCall.err != nil {
w.ctxCall.err = newErrInvalidResponseField(fieldID, w.ctxCall.err)
}
w.key = &c.prm.key
if prm.key != nil {
w.key = prm.key
}
w.cancelCtxStream = cancel
w.client = c
w.stream = stream
w.partInit.SetCopiesNumber(prm.copyNum)
w.req.SetBody(new(v2object.PutRequestBody))
c.prepareRequest(&w.req, &prm.meta)
return &w, nil
}

View file

@ -476,6 +476,15 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumber(prm.copiesNumber)
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
start := time.Now()
wObj, err := c.client.ObjectPutInit(ctx, cliPrm)
@ -484,17 +493,6 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if prm.stoken != nil {
wObj.WithinSession(*prm.stoken)
}
if prm.key != nil {
wObj.UseKey(*prm.key)
}
if prm.btoken != nil {
wObj.WithBearerToken(*prm.btoken)
}
if wObj.WriteHeader(prm.hdr) {
sz := prm.hdr.PayloadSize()