[#80] objectwriter: Allow custimize maxChunkLen

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
pull/96/head
Dmitrii Stepanov 2023-06-26 18:13:04 +03:00
parent aa8ffebc63
commit c243b443bc
1 changed files with 22 additions and 12 deletions

View File

@ -22,9 +22,10 @@ import (
// PrmObjectPutInit groups parameters of ObjectPutInit operation. // PrmObjectPutInit groups parameters of ObjectPutInit operation.
type PrmObjectPutInit struct { type PrmObjectPutInit struct {
copyNum []uint32 copyNum []uint32
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
meta v2session.RequestMetaHeader meta v2session.RequestMetaHeader
maxChunkLen int
} }
// SetCopiesNumber sets number of object copies that is enough to consider put successful. // SetCopiesNumber sets number of object copies that is enough to consider put successful.
@ -39,6 +40,15 @@ func (x *PrmObjectPutInit) SetCopiesNumberByVectors(copiesNumbers []uint32) {
x.copyNum = copiesNumbers x.copyNum = copiesNumbers
} }
// SetGRPCPayloadChunkLen sets maximum chunk length value for gRPC Put request.
// Maximum chunk length restricts maximum byte length of the chunk
// transmitted in a single stream message. It depends on
// server settings and other message fields.
// If not specified or negative value set, default value of 3MiB will be used.
func (x *PrmObjectPutInit) SetGRPCPayloadChunkLen(v int) {
x.maxChunkLen = v
}
// ResObjectPut groups the final result values of ObjectPutInit operation. // ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct { type ResObjectPut struct {
statusRes statusRes
@ -74,6 +84,8 @@ type ObjectWriter struct {
req v2object.PutRequest req v2object.PutRequest
partInit v2object.PutObjectPartInit partInit v2object.PutObjectPartInit
partChunk v2object.PutObjectPartChunk partChunk v2object.PutObjectPartChunk
maxChunkLen int
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
@ -143,15 +155,8 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
} }
for ln := len(chunk); ln > 0; ln = len(chunk) { for ln := len(chunk); ln > 0; ln = len(chunk) {
// maxChunkLen restricts maximum byte length of the chunk if ln > x.maxChunkLen {
// transmitted in a single stream message. It depends on ln = x.maxChunkLen
// server settings and other message fields, but for now
// we simply assume that 3MB is large enough to reduce the
// number of messages, and not to exceed the limit
// (4MB by default for gRPC servers).
const maxChunkLen = 3 << 20
if ln > maxChunkLen {
ln = maxChunkLen
} }
// we deal with size limit overflow above, but there is another case: // we deal with size limit overflow above, but there is another case:
@ -263,6 +268,11 @@ func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*Obje
w.stream = stream w.stream = stream
w.partInit.SetCopiesNumber(prm.copyNum) w.partInit.SetCopiesNumber(prm.copyNum)
w.req.SetBody(new(v2object.PutRequestBody)) w.req.SetBody(new(v2object.PutRequestBody))
if prm.maxChunkLen > 0 {
w.maxChunkLen = prm.maxChunkLen
} else {
w.maxChunkLen = 3 << 20
}
c.prepareRequest(&w.req, &prm.meta) c.prepareRequest(&w.req, &prm.meta)
return &w, nil return &w, nil