diff --git a/pool/object_put_transformer.go b/pool/object_put_pool_transformer.go similarity index 97% rename from pool/object_put_transformer.go rename to pool/object_put_pool_transformer.go index 08d41dde..c6add151 100644 --- a/pool/object_put_transformer.go +++ b/pool/object_put_pool_transformer.go @@ -36,6 +36,7 @@ func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) address: c.address(), logger: &c.clientStatusMonitor, } + key := &c.prm.key if prm.key != nil { key = prm.key @@ -74,6 +75,7 @@ type ResObjectPut struct { OID oid.ID } +// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing. func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) { ai, err := x.ot.Close(ctx) if err != nil { diff --git a/pool/parts_buffer_pool.go b/pool/parts_buffer_pool.go deleted file mode 100644 index 828d859f..00000000 --- a/pool/parts_buffer_pool.go +++ /dev/null @@ -1,69 +0,0 @@ -package pool - -import ( - "fmt" - "sync" -) - -type PartBuffer struct { - Buffer []byte - len uint64 -} - -type PartsBufferPool struct { - syncPool *sync.Pool - limit uint64 - maxObjectSize uint64 - - mu sync.Mutex - available uint64 -} - -func NewPartBufferPool(limit uint64, maxObjectSize uint64) *PartsBufferPool { - return &PartsBufferPool{ - limit: limit, - maxObjectSize: maxObjectSize, - available: limit, - syncPool: &sync.Pool{New: func() any { - // We have to use pointer (even for slices), see https://staticcheck.dev/docs/checks/#SA6002 - // It's based on interfaces implementation in 2016, so maybe something has changed since then. - // We can use no pointer for multi-kilobyte slices though https://github.com/golang/go/issues/16323#issuecomment-254401036 - buff := make([]byte, maxObjectSize) - return &buff - }}, - } -} - -func (p *PartsBufferPool) ParBufferSize() uint64 { - return p.maxObjectSize -} - -func (p *PartsBufferPool) GetBuffer() (*PartBuffer, error) { - p.mu.Lock() - defer p.mu.Unlock() - - if p.maxObjectSize > p.available { - return nil, fmt.Errorf("requested buffer size %d is greater than available: %d", p.maxObjectSize, p.available) - } - - p.available -= p.maxObjectSize - - return &PartBuffer{ - Buffer: *p.syncPool.Get().(*[]byte), - len: p.maxObjectSize, - }, nil -} - -func (p *PartsBufferPool) FreeBuffer(buff *PartBuffer) error { - p.mu.Lock() - defer p.mu.Unlock() - - if buff.len+p.available > p.limit { - return fmt.Errorf("buffer size %d to free is too large, available: %d, limit: %d", buff.len, p.available, p.limit) - } - - p.available += buff.len - p.syncPool.Put(&buff.Buffer) - - return nil -} diff --git a/pool/pool.go b/pool/pool.go index 94c08134..70f789ea 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -679,7 +679,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut } if prm.payload != nil { - const defaultBufferSizePut = 3 << 20 // configure? + const defaultBufferSizePut = 3 << 20 // default grpc message size, configure? if sz == 0 || sz > defaultBufferSizePut { sz = defaultBufferSizePut @@ -736,22 +736,33 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut } if wObj.WriteHeader(ctx, prm.hdr) { + sz := prm.hdr.PayloadSize() + if data := prm.hdr.Payload(); len(data) > 0 { if prm.payload != nil { prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) } else { prm.payload = bytes.NewReader(data) + sz = uint64(len(data)) } } if prm.payload != nil { + const defaultBufferSizePut = 64 * 1024 // it's buffer size in s3-gw, configure? + + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } + + buf := make([]byte, sz) + var n int for { - n, err = prm.payload.Read(prm.partBuffer) + n, err = prm.payload.Read(buf) if n > 0 { start = time.Now() - successWrite := wObj.WritePayloadChunk(ctx, prm.partBuffer[:n]) + successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break @@ -1151,7 +1162,6 @@ type InitParameters struct { nodeParams []NodeParam requestCallback func(RequestInfo) dialOptions []grpc.DialOption - maxClientCutMemory uint64 clientBuilder clientBuilder } @@ -1216,14 +1226,6 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) { x.dialOptions = opts } -// SetMaxClientCutMemory sets the max amount of bytes that can be used during client cut (see PrmObjectPut.SetClientCut). -// Default value is 1gb (that should be enough for 200 concurrent PUT request for MaxObjectSize 50mb). -// If the MaxObjectSize network param is greater than limit is set by this method -// Pool.PutObject operations with PrmObjectPut.SetClientCut will fail. -func (x *InitParameters) SetMaxClientCutMemory(size uint64) { - x.maxClientCutMemory = size -} - // setClientBuilder sets clientBuilder used for client construction. // Wraps setClientBuilderContext without a context. func (x *InitParameters) setClientBuilder(builder clientBuilder) { @@ -1406,7 +1408,6 @@ type PrmObjectPut struct { copiesNumber []uint32 clientCut bool - partBuffer []byte networkInfo netmap.NetworkInfo } @@ -1442,10 +1443,6 @@ func (x *PrmObjectPut) SetClientCut(clientCut bool) { x.clientCut = clientCut } -func (x *PrmObjectPut) setPartBuffer(partBuffer []byte) { - x.partBuffer = partBuffer -} - func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) { x.networkInfo = ni } @@ -1746,10 +1743,7 @@ type Pool struct { clientBuilder clientBuilder logger *zap.Logger - // we cannot initialize partBufferPool in NewPool function, - // so we have to save maxClientCutMemory param for further initialization in Dial. - maxClientCutMemory uint64 - partsBufferPool *PartsBufferPool + maxObjectSize uint64 } type innerPool struct { @@ -1761,7 +1755,6 @@ type innerPool struct { const ( defaultSessionTokenExpirationDuration = 100 // in blocks defaultErrorThreshold = 100 - defaultMaxClientCutMemory = 1024 * 1024 * 1024 // 1gb defaultRebalanceInterval = 15 * time.Second defaultHealthcheckTimeout = 4 * time.Second @@ -1798,8 +1791,7 @@ func NewPool(options InitParameters) (*Pool, error) { clientRebalanceInterval: options.clientRebalanceInterval, sessionExpirationDuration: options.sessionExpirationDuration, }, - clientBuilder: options.clientBuilder, - maxClientCutMemory: options.maxClientCutMemory, + clientBuilder: options.clientBuilder, } return pool, nil @@ -1860,7 +1852,7 @@ func (p *Pool) Dial(ctx context.Context) error { if err != nil { return fmt.Errorf("get network info for max object size: %w", err) } - p.partsBufferPool = NewPartBufferPool(p.maxClientCutMemory, ni.MaxObjectSize()) + p.maxObjectSize = ni.MaxObjectSize() go p.startRebalance(ctx) return nil @@ -1883,10 +1875,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { params.errorThreshold = defaultErrorThreshold } - if params.maxClientCutMemory == 0 { - params.maxClientCutMemory = defaultMaxClientCutMemory - } - if params.clientRebalanceInterval <= 0 { params.clientRebalanceInterval = defaultRebalanceInterval } @@ -2284,24 +2272,10 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) } if prm.clientCut { - buff, err := p.partsBufferPool.GetBuffer() - if err != nil { - return oid.ID{}, fmt.Errorf("cannot get buffer for put operations: %w", err) - } - - prm.setPartBuffer(buff.Buffer) - var ni netmap.NetworkInfo ni.SetCurrentEpoch(p.cache.Epoch()) - ni.SetMaxObjectSize(p.partsBufferPool.ParBufferSize()) // we want to use initial max object size in PayloadSizeLimiter - + ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter prm.setNetworkInfo(ni) - - defer func() { - if errFree := p.partsBufferPool.FreeBuffer(buff); errFree != nil { - p.log(zap.WarnLevel, "failed to free part buffer", zap.Error(err)) - } - }() } id, err := ctxCall.client.objectPut(ctx, prm)