From 0d1a5e41873f3700b05a164e2c8516b187cf0f57 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 23 May 2022 14:56:38 +0300 Subject: [PATCH] [#6] native: Factor put object upload code Signed-off-by: Alex Vanin --- internal/native/client.go | 94 +++++++++++++++------------------------ 1 file changed, 37 insertions(+), 57 deletions(-) diff --git a/internal/native/client.go b/internal/native/client.go index b69e259..f7f94cf 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -57,11 +57,9 @@ type ( } ) -func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { - rdr := bytes.NewReader(payload.Bytes()) - sz := rdr.Size() +const defaultPutBufferSize = 4 * 1024 - // preparation stage +func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { var containerID cid.ID err := containerID.DecodeString(inputContainerID) if err != nil { @@ -95,43 +93,11 @@ func (c *Client) Put(inputContainerID string, headers map[string]string, payload o.SetOwnerID(&owner) o.SetAttributes(attrs...) - buf := make([]byte, 4*1024) - - // starting upload - stats.Report(c.vu, objPutTotal, 1) - start := time.Now() - - objectWriter, err := c.cli.ObjectPutInit(c.vu.Context(), client.PrmObjectPutInit{}) + resp, err := put(c.vu, defaultPutBufferSize, c.cli, &tok, &o, payload.Bytes()) if err != nil { - stats.Report(c.vu, objPutFails, 1) return PutResponse{Success: false, Error: err.Error()} } - objectWriter.WithinSession(tok) - - if !objectWriter.WriteHeader(o) { - stats.Report(c.vu, objPutFails, 1) - _, err := objectWriter.Close() - return PutResponse{Success: false, Error: err.Error()} - } - - n, _ := rdr.Read(buf) - for n > 0 { - if !objectWriter.WritePayloadChunk(buf[:n]) { - break - } - n, _ = rdr.Read(buf) - } - - resp, err := objectWriter.Close() - if err != nil { - stats.Report(c.vu, objPutFails, 1) - return PutResponse{Success: false, Error: err.Error()} - } - - stats.ReportDataSent(c.vu, float64(sz)) - stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start))) - var id oid.ID resp.ReadStoredObjectID(&id) @@ -281,24 +247,38 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse { return PutResponse{Success: false, Error: err.Error()} } - buf := make([]byte, 4*1024) - rdr := bytes.NewReader(p.payload) - - // starting upload - // TODO(alexvanin): factor uploading code of Put() methods - stats.Report(p.vu, objPutTotal, 1) - start := time.Now() - - objectWriter, err := p.cli.ObjectPutInit(p.vu.Context(), client.PrmObjectPutInit{}) + _, err = put(p.vu, defaultPutBufferSize, p.cli, nil, &obj, p.payload) if err != nil { - stats.Report(p.vu, objPutFails, 1) return PutResponse{Success: false, Error: err.Error()} } - if !objectWriter.WriteHeader(obj) { - stats.Report(p.vu, objPutFails, 1) - _, err := objectWriter.Close() - return PutResponse{Success: false, Error: err.Error()} + return PutResponse{Success: true, ObjectID: id.String()} +} + +func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object, + hdr *object.Object, payload []byte) (*client.ResObjectPut, error) { + buf := make([]byte, bufSize) + rdr := bytes.NewReader(payload) + sz := rdr.Size() + + // starting upload + stats.Report(vu, objPutTotal, 1) + start := time.Now() + + objectWriter, err := cli.ObjectPutInit(vu.Context(), client.PrmObjectPutInit{}) + if err != nil { + stats.Report(vu, objPutFails, 1) + return nil, err + } + + if tok != nil { + objectWriter.WithinSession(*tok) + } + + if !objectWriter.WriteHeader(*hdr) { + stats.Report(vu, objPutFails, 1) + _, err = objectWriter.Close() + return nil, err } n, _ := rdr.Read(buf) @@ -309,16 +289,16 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse { n, _ = rdr.Read(buf) } - _, err = objectWriter.Close() + resp, err := objectWriter.Close() if err != nil { - stats.Report(p.vu, objPutFails, 1) - return PutResponse{Success: false, Error: err.Error()} + stats.Report(vu, objPutFails, 1) + return nil, err } - stats.ReportDataSent(p.vu, float64(obj.PayloadSize())) - stats.Report(p.vu, objPutDuration, metrics.D(time.Since(start))) + stats.ReportDataSent(vu, float64(sz)) + stats.Report(vu, objPutDuration, metrics.D(time.Since(start))) - return PutResponse{Success: true, ObjectID: id.String()} + return resp, err } func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) {