[#115] pool: Drop part buffer pool

Tests showed that using part buffer pool doesn't save memory a lot.
Especially on big parts.
Probably we can use pool only for small parts
after adding buffer in payloadSizeLimiter

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-08-01 12:52:18 +03:00
parent 3cb3841073
commit 202412230a
3 changed files with 20 additions and 113 deletions

View file

@ -36,6 +36,7 @@ func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit)
address: c.address(),
logger: &c.clientStatusMonitor,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
@ -74,6 +75,7 @@ type ResObjectPut struct {
OID oid.ID
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx)
if err != nil {

View file

@ -1,69 +0,0 @@
package pool
import (
"fmt"
"sync"
)
type PartBuffer struct {
Buffer []byte
len uint64
}
type PartsBufferPool struct {
syncPool *sync.Pool
limit uint64
maxObjectSize uint64
mu sync.Mutex
available uint64
}
func NewPartBufferPool(limit uint64, maxObjectSize uint64) *PartsBufferPool {
return &PartsBufferPool{
limit: limit,
maxObjectSize: maxObjectSize,
available: limit,
syncPool: &sync.Pool{New: func() any {
// We have to use pointer (even for slices), see https://staticcheck.dev/docs/checks/#SA6002
// It's based on interfaces implementation in 2016, so maybe something has changed since then.
// We can use no pointer for multi-kilobyte slices though https://github.com/golang/go/issues/16323#issuecomment-254401036
buff := make([]byte, maxObjectSize)
return &buff
}},
}
}
func (p *PartsBufferPool) ParBufferSize() uint64 {
return p.maxObjectSize
}
func (p *PartsBufferPool) GetBuffer() (*PartBuffer, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.maxObjectSize > p.available {
return nil, fmt.Errorf("requested buffer size %d is greater than available: %d", p.maxObjectSize, p.available)
}
p.available -= p.maxObjectSize
return &PartBuffer{
Buffer: *p.syncPool.Get().(*[]byte),
len: p.maxObjectSize,
}, nil
}
func (p *PartsBufferPool) FreeBuffer(buff *PartBuffer) error {
p.mu.Lock()
defer p.mu.Unlock()
if buff.len+p.available > p.limit {
return fmt.Errorf("buffer size %d to free is too large, available: %d, limit: %d", buff.len, p.available, p.limit)
}
p.available += buff.len
p.syncPool.Put(&buff.Buffer)
return nil
}

View file

@ -679,7 +679,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
}
if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure?
const defaultBufferSizePut = 3 << 20 // default grpc message size, configure?
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
@ -736,22 +736,33 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
const defaultBufferSizePut = 64 * 1024 // it's buffer size in s3-gw, configure?
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(prm.partBuffer)
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, prm.partBuffer[:n])
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
@ -1151,7 +1162,6 @@ type InitParameters struct {
nodeParams []NodeParam
requestCallback func(RequestInfo)
dialOptions []grpc.DialOption
maxClientCutMemory uint64
clientBuilder clientBuilder
}
@ -1216,14 +1226,6 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts
}
// SetMaxClientCutMemory sets the max amount of bytes that can be used during client cut (see PrmObjectPut.SetClientCut).
// Default value is 1gb (that should be enough for 200 concurrent PUT request for MaxObjectSize 50mb).
// If the MaxObjectSize network param is greater than limit is set by this method
// Pool.PutObject operations with PrmObjectPut.SetClientCut will fail.
func (x *InitParameters) SetMaxClientCutMemory(size uint64) {
x.maxClientCutMemory = size
}
// setClientBuilder sets clientBuilder used for client construction.
// Wraps setClientBuilderContext without a context.
func (x *InitParameters) setClientBuilder(builder clientBuilder) {
@ -1406,7 +1408,6 @@ type PrmObjectPut struct {
copiesNumber []uint32
clientCut bool
partBuffer []byte
networkInfo netmap.NetworkInfo
}
@ -1442,10 +1443,6 @@ func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
func (x *PrmObjectPut) setPartBuffer(partBuffer []byte) {
x.partBuffer = partBuffer
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
@ -1746,10 +1743,7 @@ type Pool struct {
clientBuilder clientBuilder
logger *zap.Logger
// we cannot initialize partBufferPool in NewPool function,
// so we have to save maxClientCutMemory param for further initialization in Dial.
maxClientCutMemory uint64
partsBufferPool *PartsBufferPool
maxObjectSize uint64
}
type innerPool struct {
@ -1761,7 +1755,6 @@ type innerPool struct {
const (
defaultSessionTokenExpirationDuration = 100 // in blocks
defaultErrorThreshold = 100
defaultMaxClientCutMemory = 1024 * 1024 * 1024 // 1gb
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
@ -1798,8 +1791,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
maxClientCutMemory: options.maxClientCutMemory,
clientBuilder: options.clientBuilder,
}
return pool, nil
@ -1860,7 +1852,7 @@ func (p *Pool) Dial(ctx context.Context) error {
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.partsBufferPool = NewPartBufferPool(p.maxClientCutMemory, ni.MaxObjectSize())
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
return nil
@ -1883,10 +1875,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
params.errorThreshold = defaultErrorThreshold
}
if params.maxClientCutMemory == 0 {
params.maxClientCutMemory = defaultMaxClientCutMemory
}
if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval
}
@ -2284,24 +2272,10 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
}
if prm.clientCut {
buff, err := p.partsBufferPool.GetBuffer()
if err != nil {
return oid.ID{}, fmt.Errorf("cannot get buffer for put operations: %w", err)
}
prm.setPartBuffer(buff.Buffer)
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.partsBufferPool.ParBufferSize()) // we want to use initial max object size in PayloadSizeLimiter
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
defer func() {
if errFree := p.partsBufferPool.FreeBuffer(buff); errFree != nil {
p.log(zap.WarnLevel, "failed to free part buffer", zap.Error(err))
}
}()
}
id, err := ctxCall.client.objectPut(ctx, prm)