[#6] native: Factor put object upload code

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
fyrchik/lorem-ipsum
Alex Vanin 2022-05-23 14:56:38 +03:00 committed by Alex Vanin
parent 2da51e4aa2
commit 0d1a5e4187
1 changed files with 37 additions and 57 deletions

View File

@ -57,11 +57,9 @@ type (
}
)
func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
rdr := bytes.NewReader(payload.Bytes())
sz := rdr.Size()
const defaultPutBufferSize = 4 * 1024
// preparation stage
func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
var containerID cid.ID
err := containerID.DecodeString(inputContainerID)
if err != nil {
@ -95,43 +93,11 @@ func (c *Client) Put(inputContainerID string, headers map[string]string, payload
o.SetOwnerID(&owner)
o.SetAttributes(attrs...)
buf := make([]byte, 4*1024)
// starting upload
stats.Report(c.vu, objPutTotal, 1)
start := time.Now()
objectWriter, err := c.cli.ObjectPutInit(c.vu.Context(), client.PrmObjectPutInit{})
resp, err := put(c.vu, defaultPutBufferSize, c.cli, &tok, &o, payload.Bytes())
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
objectWriter.WithinSession(tok)
if !objectWriter.WriteHeader(o) {
stats.Report(c.vu, objPutFails, 1)
_, err := objectWriter.Close()
return PutResponse{Success: false, Error: err.Error()}
}
n, _ := rdr.Read(buf)
for n > 0 {
if !objectWriter.WritePayloadChunk(buf[:n]) {
break
}
n, _ = rdr.Read(buf)
}
resp, err := objectWriter.Close()
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
var id oid.ID
resp.ReadStoredObjectID(&id)
@ -281,24 +247,38 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()}
}
buf := make([]byte, 4*1024)
rdr := bytes.NewReader(p.payload)
// starting upload
// TODO(alexvanin): factor uploading code of Put() methods
stats.Report(p.vu, objPutTotal, 1)
start := time.Now()
objectWriter, err := p.cli.ObjectPutInit(p.vu.Context(), client.PrmObjectPutInit{})
_, err = put(p.vu, defaultPutBufferSize, p.cli, nil, &obj, p.payload)
if err != nil {
stats.Report(p.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
if !objectWriter.WriteHeader(obj) {
stats.Report(p.vu, objPutFails, 1)
_, err := objectWriter.Close()
return PutResponse{Success: false, Error: err.Error()}
return PutResponse{Success: true, ObjectID: id.String()}
}
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
hdr *object.Object, payload []byte) (*client.ResObjectPut, error) {
buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload)
sz := rdr.Size()
// starting upload
stats.Report(vu, objPutTotal, 1)
start := time.Now()
objectWriter, err := cli.ObjectPutInit(vu.Context(), client.PrmObjectPutInit{})
if err != nil {
stats.Report(vu, objPutFails, 1)
return nil, err
}
if tok != nil {
objectWriter.WithinSession(*tok)
}
if !objectWriter.WriteHeader(*hdr) {
stats.Report(vu, objPutFails, 1)
_, err = objectWriter.Close()
return nil, err
}
n, _ := rdr.Read(buf)
@ -309,16 +289,16 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
n, _ = rdr.Read(buf)
}
_, err = objectWriter.Close()
resp, err := objectWriter.Close()
if err != nil {
stats.Report(p.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
stats.Report(vu, objPutFails, 1)
return nil, err
}
stats.ReportDataSent(p.vu, float64(obj.PayloadSize()))
stats.Report(p.vu, objPutDuration, metrics.D(time.Since(start)))
stats.ReportDataSent(vu, float64(sz))
stats.Report(vu, objPutDuration, metrics.D(time.Since(start)))
return PutResponse{Success: true, ObjectID: id.String()}
return resp, err
}
func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) {