From d6d6a41f5d72e410350f435a77cde19b95ca038e Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 25 Aug 2022 15:14:18 +0300 Subject: [PATCH] [#323] client: Refactor `object.Put` Signed-off-by: Evgenii Stratonikov --- client/object_put.go | 159 ++++++++++++++++++++++--------------------- pool/pool.go | 20 +++--- 2 files changed, 90 insertions(+), 89 deletions(-) diff --git a/client/object_put.go b/client/object_put.go index c6554ad..1361992 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -12,7 +12,9 @@ import ( rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-sdk-go/bearer" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" @@ -21,6 +23,8 @@ import ( // PrmObjectPutInit groups parameters of ObjectPutInit operation. type PrmObjectPutInit struct { copyNum uint32 + key *ecdsa.PrivateKey + meta v2session.RequestMetaHeader } // SetCopiesNumber sets number of object copies that is enough to consider put successful. @@ -47,57 +51,62 @@ func (x ResObjectPut) StoredObjectID() oid.ID { type ObjectWriter struct { cancelCtxStream context.CancelFunc - ctxCall contextCall + client *Client + stream interface { + Write(*v2object.PutRequest) error + Close() error + } - // initially bound tp contextCall - metaHdr v2session.RequestMetaHeader - - // initially bound to contextCall - partInit v2object.PutObjectPartInit + key *ecdsa.PrivateKey + res ResObjectPut + err error chunkCalled bool + respV2 v2object.PutResponse + req v2object.PutRequest + partInit v2object.PutObjectPartInit partChunk v2object.PutObjectPartChunk } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. -func (x *ObjectWriter) UseKey(key ecdsa.PrivateKey) { - x.ctxCall.key = key +func (x *PrmObjectPutInit) UseKey(key ecdsa.PrivateKey) { + x.key = &key } // WithBearerToken attaches bearer token to be used for the operation. // Should be called once before any writing steps. -func (x *ObjectWriter) WithBearerToken(t bearer.Token) { +func (x *PrmObjectPutInit) WithBearerToken(t bearer.Token) { var v2token acl.BearerToken t.WriteToV2(&v2token) - x.metaHdr.SetBearerToken(&v2token) + x.meta.SetBearerToken(&v2token) } // WithinSession specifies session within which object should be stored. // Should be called once before any writing steps. -func (x *ObjectWriter) WithinSession(t session.Object) { +func (x *PrmObjectPutInit) WithinSession(t session.Object) { var tv2 v2session.Token t.WriteToV2(&tv2) - x.metaHdr.SetSessionToken(&tv2) + x.meta.SetSessionToken(&tv2) } // MarkLocal tells the server to execute the operation locally. -func (x *ObjectWriter) MarkLocal() { - x.metaHdr.SetTTL(1) +func (x *PrmObjectPutInit) MarkLocal() { + x.meta.SetTTL(1) } // WithXHeaders specifies list of extended headers (string key-value pairs) // to be attached to the request. Must have an even length. // // Slice must not be mutated until the operation completes. -func (x *ObjectWriter) WithXHeaders(hs ...string) { +func (x *PrmObjectPutInit) WithXHeaders(hs ...string) { if len(hs)%2 != 0 { panic("slice of X-Headers with odd length") } - writeXHeadersToMeta(hs, &x.metaHdr) + writeXHeadersToMeta(hs, &x.meta) } // WriteHeader writes header of the object. Result means success. @@ -109,7 +118,17 @@ func (x *ObjectWriter) WriteHeader(hdr object.Object) bool { x.partInit.SetHeader(v2Hdr.GetHeader()) x.partInit.SetSignature(v2Hdr.GetSignature()) - return x.ctxCall.writeRequest() + x.req.GetBody().SetObjectPart(&x.partInit) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + return x.err == nil } // WritePayloadChunk writes chunk of the object payload. Result means success. @@ -117,7 +136,7 @@ func (x *ObjectWriter) WriteHeader(hdr object.Object) bool { func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { if !x.chunkCalled { x.chunkCalled = true - x.ctxCall.req.(*v2object.PutRequest).GetBody().SetObjectPart(&x.partChunk) + x.req.GetBody().SetObjectPart(&x.partChunk) } for ln := len(chunk); ln > 0; ln = len(chunk) { @@ -142,8 +161,16 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { // It is mentally assumed that allocating and filling the buffer is better than // synchronous sending, but this needs to be tested. x.partChunk.SetChunk(chunk[:ln]) + x.req.SetVerificationHeader(nil) - if !x.ctxCall.writeRequest() { + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + if x.err != nil { return false } @@ -175,28 +202,36 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) { // Ignore io.EOF error, because it is expected error for client-side // stream termination by the server. E.g. when stream contains invalid // message. Server returns an error in response message (in status). - if x.ctxCall.err != nil && !errors.Is(x.ctxCall.err, io.EOF) { - return nil, x.ctxCall.err + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err } - if x.ctxCall.err = x.ctxCall.closer(); x.ctxCall.err != nil { - return nil, x.ctxCall.err + if x.err = x.stream.Close(); x.err != nil { + return nil, x.err } - x.ctxCall.processResponse() - - if x.ctxCall.err != nil { - return nil, x.ctxCall.err + x.res.st, x.err = x.client.processResponse(&x.respV2) + if x.err != nil { + return nil, x.err } - if x.ctxCall.result != nil { - x.ctxCall.result(x.ctxCall.resp) - if x.ctxCall.err != nil { - return nil, x.ctxCall.err - } + if !apistatus.IsSuccessful(x.res.st) { + return &x.res, nil } - return x.ctxCall.statusRes.(*ResObjectPut), nil + const fieldID = "ID" + + idV2 := x.respV2.GetBody().GetObjectID() + if idV2 == nil { + return nil, newErrMissingResponseField(fieldID) + } + + x.err = x.res.obj.ReadFromV2(*idV2) + if x.err != nil { + x.err = newErrInvalidResponseField(fieldID, x.err) + } + + return &x.res, nil } // ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol. @@ -211,57 +246,25 @@ func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*Obje panic(panicMsgMissingContext) } - // open stream - var ( - res ResObjectPut - w ObjectWriter + var w ObjectWriter - resp v2object.PutResponse - ) - - ctx, w.cancelCtxStream = context.WithCancel(ctx) - - w.partInit.SetCopiesNumber(prm.copyNum) - - stream, err := rpcapi.PutObject(&c.c, &resp, client.WithContext(ctx)) + ctx, cancel := context.WithCancel(ctx) + stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx)) if err != nil { + cancel() return nil, fmt.Errorf("open stream: %w", err) } - // form request body - var body v2object.PutRequestBody - - // form request - var req v2object.PutRequest - - req.SetBody(&body) - - req.SetMetaHeader(&w.metaHdr) - body.SetObjectPart(&w.partInit) - - // init call context - c.initCallContext(&w.ctxCall) - w.ctxCall.req = &req - w.ctxCall.statusRes = &res - w.ctxCall.resp = &resp - w.ctxCall.wReq = func() error { - return stream.Write(&req) - } - w.ctxCall.closer = stream.Close - w.ctxCall.result = func(r responseV2) { - const fieldID = "ID" - - idV2 := r.(*v2object.PutResponse).GetBody().GetObjectID() - if idV2 == nil { - w.ctxCall.err = newErrMissingResponseField(fieldID) - return - } - - w.ctxCall.err = res.obj.ReadFromV2(*idV2) - if w.ctxCall.err != nil { - w.ctxCall.err = newErrInvalidResponseField(fieldID, w.ctxCall.err) - } + w.key = &c.prm.key + if prm.key != nil { + w.key = prm.key } + w.cancelCtxStream = cancel + w.client = c + w.stream = stream + w.partInit.SetCopiesNumber(prm.copyNum) + w.req.SetBody(new(v2object.PutRequestBody)) + c.prepareRequest(&w.req, &prm.meta) return &w, nil } diff --git a/pool/pool.go b/pool/pool.go index 6cfaaef..ee716ef 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -476,6 +476,15 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { var cliPrm sdkClient.PrmObjectPutInit cliPrm.SetCopiesNumber(prm.copiesNumber) + if prm.stoken != nil { + cliPrm.WithinSession(*prm.stoken) + } + if prm.key != nil { + cliPrm.UseKey(*prm.key) + } + if prm.btoken != nil { + cliPrm.WithBearerToken(*prm.btoken) + } start := time.Now() wObj, err := c.client.ObjectPutInit(ctx, cliPrm) @@ -484,17 +493,6 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) } - if prm.stoken != nil { - wObj.WithinSession(*prm.stoken) - } - if prm.key != nil { - wObj.UseKey(*prm.key) - } - - if prm.btoken != nil { - wObj.WithBearerToken(*prm.btoken) - } - if wObj.WriteHeader(prm.hdr) { sz := prm.hdr.PayloadSize()