feature/115-adopt_put_single_pool #140

Merged
dkirillov merged 1 commit from dkirillov/frostfs-sdk-go:feature/115-adopt_put_single_pool into master 2024-09-04 19:51:15 +00:00
3 changed files with 20 additions and 113 deletions

View file

@ -36,6 +36,7 @@ func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit)
address: c.address(), address: c.address(),
logger: &c.clientStatusMonitor, logger: &c.clientStatusMonitor,
} }
key := &c.prm.key key := &c.prm.key
if prm.key != nil { if prm.key != nil {
key = prm.key key = prm.key
@ -74,6 +75,7 @@ type ResObjectPut struct {
OID oid.ID OID oid.ID
} }
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) { func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx) ai, err := x.ot.Close(ctx)
if err != nil { if err != nil {

View file

@ -1,69 +0,0 @@
package pool
import (
"fmt"
"sync"
)
type PartBuffer struct {
Buffer []byte
len uint64
}
type PartsBufferPool struct {
syncPool *sync.Pool
limit uint64
maxObjectSize uint64
mu sync.Mutex
available uint64
}
func NewPartBufferPool(limit uint64, maxObjectSize uint64) *PartsBufferPool {
return &PartsBufferPool{
limit: limit,
maxObjectSize: maxObjectSize,
available: limit,
syncPool: &sync.Pool{New: func() any {
// We have to use pointer (even for slices), see https://staticcheck.dev/docs/checks/#SA6002
// It's based on interfaces implementation in 2016, so maybe something has changed since then.
// We can use no pointer for multi-kilobyte slices though https://github.com/golang/go/issues/16323#issuecomment-254401036
buff := make([]byte, maxObjectSize)
return &buff
}},
}
}
func (p *PartsBufferPool) ParBufferSize() uint64 {
return p.maxObjectSize
}
func (p *PartsBufferPool) GetBuffer() (*PartBuffer, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.maxObjectSize > p.available {
return nil, fmt.Errorf("requested buffer size %d is greater than available: %d", p.maxObjectSize, p.available)
}
p.available -= p.maxObjectSize
return &PartBuffer{
Buffer: *p.syncPool.Get().(*[]byte),
len: p.maxObjectSize,
}, nil
}
func (p *PartsBufferPool) FreeBuffer(buff *PartBuffer) error {
p.mu.Lock()
defer p.mu.Unlock()
if buff.len+p.available > p.limit {
return fmt.Errorf("buffer size %d to free is too large, available: %d, limit: %d", buff.len, p.available, p.limit)
}
p.available += buff.len
p.syncPool.Put(&buff.Buffer)
return nil
}

View file

@ -679,7 +679,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
} }
if prm.payload != nil { if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure? const defaultBufferSizePut = 3 << 20 // default grpc message size, configure?
if sz == 0 || sz > defaultBufferSizePut { if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut sz = defaultBufferSizePut
@ -736,22 +736,33 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
} }
if wObj.WriteHeader(ctx, prm.hdr) { if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 { if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil { if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else { } else {
prm.payload = bytes.NewReader(data) prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
} }
} }
if prm.payload != nil { if prm.payload != nil {
const defaultBufferSizePut = 64 * 1024 // it's buffer size in s3-gw, configure?
Review

Should we create an issue for that?

Should we create an issue for that?
Review
https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/149
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
var n int var n int
for { for {
n, err = prm.payload.Read(prm.partBuffer) n, err = prm.payload.Read(buf)
if n > 0 { if n > 0 {
start = time.Now() start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, prm.partBuffer[:n]) successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut) c.incRequests(time.Since(start), methodObjectPut)
if !successWrite { if !successWrite {
break break
@ -1151,7 +1162,6 @@ type InitParameters struct {
nodeParams []NodeParam nodeParams []NodeParam
requestCallback func(RequestInfo) requestCallback func(RequestInfo)
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
maxClientCutMemory uint64
clientBuilder clientBuilder clientBuilder clientBuilder
} }
@ -1216,14 +1226,6 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts x.dialOptions = opts
} }
// SetMaxClientCutMemory sets the max amount of bytes that can be used during client cut (see PrmObjectPut.SetClientCut).
// Default value is 1gb (that should be enough for 200 concurrent PUT request for MaxObjectSize 50mb).
// If the MaxObjectSize network param is greater than limit is set by this method
// Pool.PutObject operations with PrmObjectPut.SetClientCut will fail.
func (x *InitParameters) SetMaxClientCutMemory(size uint64) {
x.maxClientCutMemory = size
}
// setClientBuilder sets clientBuilder used for client construction. // setClientBuilder sets clientBuilder used for client construction.
// Wraps setClientBuilderContext without a context. // Wraps setClientBuilderContext without a context.
func (x *InitParameters) setClientBuilder(builder clientBuilder) { func (x *InitParameters) setClientBuilder(builder clientBuilder) {
@ -1406,7 +1408,6 @@ type PrmObjectPut struct {
copiesNumber []uint32 copiesNumber []uint32
clientCut bool clientCut bool
partBuffer []byte
networkInfo netmap.NetworkInfo networkInfo netmap.NetworkInfo
} }
@ -1442,10 +1443,6 @@ func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut x.clientCut = clientCut
} }
func (x *PrmObjectPut) setPartBuffer(partBuffer []byte) {
x.partBuffer = partBuffer
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) { func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni x.networkInfo = ni
} }
@ -1746,10 +1743,7 @@ type Pool struct {
clientBuilder clientBuilder clientBuilder clientBuilder
logger *zap.Logger logger *zap.Logger
// we cannot initialize partBufferPool in NewPool function, maxObjectSize uint64
// so we have to save maxClientCutMemory param for further initialization in Dial.
maxClientCutMemory uint64
partsBufferPool *PartsBufferPool
} }
type innerPool struct { type innerPool struct {
@ -1761,7 +1755,6 @@ type innerPool struct {
const ( const (
defaultSessionTokenExpirationDuration = 100 // in blocks defaultSessionTokenExpirationDuration = 100 // in blocks
defaultErrorThreshold = 100 defaultErrorThreshold = 100
defaultMaxClientCutMemory = 1024 * 1024 * 1024 // 1gb
defaultRebalanceInterval = 15 * time.Second defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second defaultHealthcheckTimeout = 4 * time.Second
@ -1798,8 +1791,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval, clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration, sessionExpirationDuration: options.sessionExpirationDuration,
}, },
clientBuilder: options.clientBuilder, clientBuilder: options.clientBuilder,
maxClientCutMemory: options.maxClientCutMemory,
} }
return pool, nil return pool, nil
@ -1860,7 +1852,7 @@ func (p *Pool) Dial(ctx context.Context) error {
if err != nil { if err != nil {
return fmt.Errorf("get network info for max object size: %w", err) return fmt.Errorf("get network info for max object size: %w", err)
} }
p.partsBufferPool = NewPartBufferPool(p.maxClientCutMemory, ni.MaxObjectSize()) p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx) go p.startRebalance(ctx)
return nil return nil
@ -1883,10 +1875,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
params.errorThreshold = defaultErrorThreshold params.errorThreshold = defaultErrorThreshold
} }
if params.maxClientCutMemory == 0 {
params.maxClientCutMemory = defaultMaxClientCutMemory
}
if params.clientRebalanceInterval <= 0 { if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval params.clientRebalanceInterval = defaultRebalanceInterval
} }
@ -2284,24 +2272,10 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
} }
if prm.clientCut { if prm.clientCut {
buff, err := p.partsBufferPool.GetBuffer()
if err != nil {
return oid.ID{}, fmt.Errorf("cannot get buffer for put operations: %w", err)
}
prm.setPartBuffer(buff.Buffer)
var ni netmap.NetworkInfo var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch()) ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.partsBufferPool.ParBufferSize()) // we want to use initial max object size in PayloadSizeLimiter ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni) prm.setNetworkInfo(ni)
defer func() {
if errFree := p.partsBufferPool.FreeBuffer(buff); errFree != nil {
p.log(zap.WarnLevel, "failed to free part buffer", zap.Error(err))
}
}()
} }
id, err := ctxCall.client.objectPut(ctx, prm) id, err := ctxCall.client.objectPut(ctx, prm)