Allow to pass custom gRPC chunk size for object put #79
2 changed files with 24 additions and 36 deletions
|
@ -31,11 +31,10 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Client struct {
|
Client struct {
|
||||||
vu modules.VU
|
vu modules.VU
|
||||||
key ecdsa.PrivateKey
|
key ecdsa.PrivateKey
|
||||||
tok session.Object
|
tok session.Object
|
||||||
cli *client.Client
|
cli *client.Client
|
||||||
bufsize int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PutResponse struct {
|
PutResponse struct {
|
||||||
|
@ -66,10 +65,9 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
PreparedObject struct {
|
PreparedObject struct {
|
||||||
vu modules.VU
|
vu modules.VU
|
||||||
key ecdsa.PrivateKey
|
key ecdsa.PrivateKey
|
||||||
cli *client.Client
|
cli *client.Client
|
||||||
bufsize int
|
|
||||||
|
|
||||||
hdr object.Object
|
hdr object.Object
|
||||||
payload []byte
|
payload []byte
|
||||||
|
@ -78,17 +76,6 @@ type (
|
||||||
|
|
||||||
const defaultBufferSize = 64 * 1024
|
const defaultBufferSize = 64 * 1024
|
||||||
|
|
||||||
func (c *Client) SetBufferSize(size int) {
|
|
||||||
if size < 0 {
|
|
||||||
panic("buffer size must be positive")
|
|
||||||
}
|
|
||||||
if size == 0 {
|
|
||||||
c.bufsize = defaultBufferSize
|
|
||||||
} else {
|
|
||||||
c.bufsize = size
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
|
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
|
||||||
cliContainerID := parseContainerID(containerID)
|
cliContainerID := parseContainerID(containerID)
|
||||||
|
|
||||||
|
@ -116,7 +103,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
|
||||||
o.SetOwnerID(&owner)
|
o.SetOwnerID(&owner)
|
||||||
o.SetAttributes(attrs...)
|
o.SetAttributes(attrs...)
|
||||||
|
|
||||||
resp, err := put(c.vu, c.bufsize, c.cli, &tok, &o, payload.Bytes(), chunkSize)
|
resp, err := put(c.vu, c.cli, &tok, &o, payload.Bytes(), chunkSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -176,7 +163,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
|
||||||
prm.WithinSession(tok)
|
prm.WithinSession(tok)
|
||||||
|
|
||||||
var objSize = 0
|
var objSize = 0
|
||||||
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
|
err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
|
||||||
objSize += len(data)
|
objSize += len(data)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -194,10 +181,9 @@ func get(
|
||||||
cli *client.Client,
|
cli *client.Client,
|
||||||
prm client.PrmObjectGet,
|
prm client.PrmObjectGet,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
bufSize int,
|
|
||||||
onDataChunk func(chunk []byte),
|
onDataChunk func(chunk []byte),
|
||||||
) error {
|
) error {
|
||||||
var buf = make([]byte, bufSize)
|
var buf = make([]byte, defaultBufferSize)
|
||||||
|
|
||||||
objectReader, err := cli.ObjectGetInit(ctx, prm)
|
objectReader, err := cli.ObjectGetInit(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -245,7 +231,7 @@ func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHa
|
||||||
prm.WithinSession(tok)
|
prm.WithinSession(tok)
|
||||||
|
|
||||||
hasher := sha256.New()
|
hasher := sha256.New()
|
||||||
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
|
err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
|
||||||
hasher.Write(data)
|
hasher.Write(data)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -381,10 +367,9 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
|
||||||
}
|
}
|
||||||
|
|
||||||
return PreparedObject{
|
return PreparedObject{
|
||||||
vu: c.vu,
|
vu: c.vu,
|
||||||
key: c.key,
|
key: c.key,
|
||||||
cli: c.cli,
|
cli: c.cli,
|
||||||
bufsize: c.bufsize,
|
|
||||||
|
|
||||||
hdr: *obj,
|
hdr: *obj,
|
||||||
payload: data,
|
payload: data,
|
||||||
|
@ -413,7 +398,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = put(p.vu, p.bufsize, p.cli, nil, &obj, p.payload, 0)
|
_, err = put(p.vu, p.cli, nil, &obj, p.payload, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -421,8 +406,12 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
||||||
return PutResponse{Success: true, ObjectID: id.String()}
|
return PutResponse{Success: true, ObjectID: id.String()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
|
func put(vu modules.VU, cli *client.Client, tok *session.Object,
|
||||||
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
|
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
|
||||||
|
bufSize := defaultBufferSize
|
||||||
|
if chunkSize > 0 {
|
||||||
|
bufSize = chunkSize
|
||||||
|
}
|
||||||
buf := make([]byte, bufSize)
|
buf := make([]byte, bufSize)
|
||||||
rdr := bytes.NewReader(payload)
|
rdr := bytes.NewReader(payload)
|
||||||
sz := rdr.Size()
|
sz := rdr.Size()
|
||||||
|
|
|
@ -133,10 +133,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
|
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
vu: n.vu,
|
vu: n.vu,
|
||||||
key: pk.PrivateKey,
|
key: pk.PrivateKey,
|
||||||
tok: tok,
|
tok: tok,
|
||||||
cli: &cli,
|
cli: &cli,
|
||||||
bufsize: defaultBufferSize,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue