feature/115-adopt_put_single_pool #140
3 changed files with 20 additions and 113 deletions
|
@ -36,6 +36,7 @@ func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit)
|
||||||
address: c.address(),
|
address: c.address(),
|
||||||
logger: &c.clientStatusMonitor,
|
logger: &c.clientStatusMonitor,
|
||||||
}
|
}
|
||||||
|
|
||||||
key := &c.prm.key
|
key := &c.prm.key
|
||||||
if prm.key != nil {
|
if prm.key != nil {
|
||||||
key = prm.key
|
key = prm.key
|
||||||
|
@ -74,6 +75,7 @@ type ResObjectPut struct {
|
||||||
OID oid.ID
|
OID oid.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
|
||||||
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
||||||
ai, err := x.ot.Close(ctx)
|
ai, err := x.ot.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
|
@ -1,69 +0,0 @@
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PartBuffer struct {
|
|
||||||
Buffer []byte
|
|
||||||
len uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type PartsBufferPool struct {
|
|
||||||
syncPool *sync.Pool
|
|
||||||
limit uint64
|
|
||||||
maxObjectSize uint64
|
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
available uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPartBufferPool(limit uint64, maxObjectSize uint64) *PartsBufferPool {
|
|
||||||
return &PartsBufferPool{
|
|
||||||
limit: limit,
|
|
||||||
maxObjectSize: maxObjectSize,
|
|
||||||
available: limit,
|
|
||||||
syncPool: &sync.Pool{New: func() any {
|
|
||||||
// We have to use pointer (even for slices), see https://staticcheck.dev/docs/checks/#SA6002
|
|
||||||
// It's based on interfaces implementation in 2016, so maybe something has changed since then.
|
|
||||||
// We can use no pointer for multi-kilobyte slices though https://github.com/golang/go/issues/16323#issuecomment-254401036
|
|
||||||
buff := make([]byte, maxObjectSize)
|
|
||||||
return &buff
|
|
||||||
}},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PartsBufferPool) ParBufferSize() uint64 {
|
|
||||||
return p.maxObjectSize
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PartsBufferPool) GetBuffer() (*PartBuffer, error) {
|
|
||||||
p.mu.Lock()
|
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
if p.maxObjectSize > p.available {
|
|
||||||
return nil, fmt.Errorf("requested buffer size %d is greater than available: %d", p.maxObjectSize, p.available)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.available -= p.maxObjectSize
|
|
||||||
|
|
||||||
return &PartBuffer{
|
|
||||||
Buffer: *p.syncPool.Get().(*[]byte),
|
|
||||||
len: p.maxObjectSize,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PartsBufferPool) FreeBuffer(buff *PartBuffer) error {
|
|
||||||
p.mu.Lock()
|
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
if buff.len+p.available > p.limit {
|
|
||||||
return fmt.Errorf("buffer size %d to free is too large, available: %d, limit: %d", buff.len, p.available, p.limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.available += buff.len
|
|
||||||
p.syncPool.Put(&buff.Buffer)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
60
pool/pool.go
60
pool/pool.go
|
@ -679,7 +679,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.payload != nil {
|
if prm.payload != nil {
|
||||||
const defaultBufferSizePut = 3 << 20 // configure?
|
const defaultBufferSizePut = 3 << 20 // default grpc message size, configure?
|
||||||
|
|
||||||
if sz == 0 || sz > defaultBufferSizePut {
|
if sz == 0 || sz > defaultBufferSizePut {
|
||||||
sz = defaultBufferSizePut
|
sz = defaultBufferSizePut
|
||||||
|
@ -736,22 +736,33 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
|
||||||
}
|
}
|
||||||
|
|
||||||
if wObj.WriteHeader(ctx, prm.hdr) {
|
if wObj.WriteHeader(ctx, prm.hdr) {
|
||||||
|
sz := prm.hdr.PayloadSize()
|
||||||
|
|
||||||
if data := prm.hdr.Payload(); len(data) > 0 {
|
if data := prm.hdr.Payload(); len(data) > 0 {
|
||||||
if prm.payload != nil {
|
if prm.payload != nil {
|
||||||
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
|
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
|
||||||
} else {
|
} else {
|
||||||
prm.payload = bytes.NewReader(data)
|
prm.payload = bytes.NewReader(data)
|
||||||
|
sz = uint64(len(data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.payload != nil {
|
if prm.payload != nil {
|
||||||
|
const defaultBufferSizePut = 64 * 1024 // it's buffer size in s3-gw, configure?
|
||||||
|
|||||||
|
|
||||||
|
if sz == 0 || sz > defaultBufferSizePut {
|
||||||
|
sz = defaultBufferSizePut
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, sz)
|
||||||
|
|
||||||
var n int
|
var n int
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, err = prm.payload.Read(prm.partBuffer)
|
n, err = prm.payload.Read(buf)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
successWrite := wObj.WritePayloadChunk(ctx, prm.partBuffer[:n])
|
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
|
||||||
c.incRequests(time.Since(start), methodObjectPut)
|
c.incRequests(time.Since(start), methodObjectPut)
|
||||||
if !successWrite {
|
if !successWrite {
|
||||||
break
|
break
|
||||||
|
@ -1151,7 +1162,6 @@ type InitParameters struct {
|
||||||
nodeParams []NodeParam
|
nodeParams []NodeParam
|
||||||
requestCallback func(RequestInfo)
|
requestCallback func(RequestInfo)
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
maxClientCutMemory uint64
|
|
||||||
|
|
||||||
clientBuilder clientBuilder
|
clientBuilder clientBuilder
|
||||||
}
|
}
|
||||||
|
@ -1216,14 +1226,6 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||||
x.dialOptions = opts
|
x.dialOptions = opts
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetMaxClientCutMemory sets the max amount of bytes that can be used during client cut (see PrmObjectPut.SetClientCut).
|
|
||||||
// Default value is 1gb (that should be enough for 200 concurrent PUT request for MaxObjectSize 50mb).
|
|
||||||
// If the MaxObjectSize network param is greater than limit is set by this method
|
|
||||||
// Pool.PutObject operations with PrmObjectPut.SetClientCut will fail.
|
|
||||||
func (x *InitParameters) SetMaxClientCutMemory(size uint64) {
|
|
||||||
x.maxClientCutMemory = size
|
|
||||||
}
|
|
||||||
|
|
||||||
// setClientBuilder sets clientBuilder used for client construction.
|
// setClientBuilder sets clientBuilder used for client construction.
|
||||||
// Wraps setClientBuilderContext without a context.
|
// Wraps setClientBuilderContext without a context.
|
||||||
func (x *InitParameters) setClientBuilder(builder clientBuilder) {
|
func (x *InitParameters) setClientBuilder(builder clientBuilder) {
|
||||||
|
@ -1406,7 +1408,6 @@ type PrmObjectPut struct {
|
||||||
copiesNumber []uint32
|
copiesNumber []uint32
|
||||||
|
|
||||||
clientCut bool
|
clientCut bool
|
||||||
partBuffer []byte
|
|
||||||
networkInfo netmap.NetworkInfo
|
networkInfo netmap.NetworkInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1442,10 +1443,6 @@ func (x *PrmObjectPut) SetClientCut(clientCut bool) {
|
||||||
x.clientCut = clientCut
|
x.clientCut = clientCut
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *PrmObjectPut) setPartBuffer(partBuffer []byte) {
|
|
||||||
x.partBuffer = partBuffer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
||||||
x.networkInfo = ni
|
x.networkInfo = ni
|
||||||
}
|
}
|
||||||
|
@ -1746,10 +1743,7 @@ type Pool struct {
|
||||||
clientBuilder clientBuilder
|
clientBuilder clientBuilder
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
|
||||||
// we cannot initialize partBufferPool in NewPool function,
|
maxObjectSize uint64
|
||||||
// so we have to save maxClientCutMemory param for further initialization in Dial.
|
|
||||||
maxClientCutMemory uint64
|
|
||||||
partsBufferPool *PartsBufferPool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type innerPool struct {
|
type innerPool struct {
|
||||||
|
@ -1761,7 +1755,6 @@ type innerPool struct {
|
||||||
const (
|
const (
|
||||||
defaultSessionTokenExpirationDuration = 100 // in blocks
|
defaultSessionTokenExpirationDuration = 100 // in blocks
|
||||||
defaultErrorThreshold = 100
|
defaultErrorThreshold = 100
|
||||||
defaultMaxClientCutMemory = 1024 * 1024 * 1024 // 1gb
|
|
||||||
|
|
||||||
defaultRebalanceInterval = 15 * time.Second
|
defaultRebalanceInterval = 15 * time.Second
|
||||||
defaultHealthcheckTimeout = 4 * time.Second
|
defaultHealthcheckTimeout = 4 * time.Second
|
||||||
|
@ -1799,7 +1792,6 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
sessionExpirationDuration: options.sessionExpirationDuration,
|
sessionExpirationDuration: options.sessionExpirationDuration,
|
||||||
},
|
},
|
||||||
clientBuilder: options.clientBuilder,
|
clientBuilder: options.clientBuilder,
|
||||||
maxClientCutMemory: options.maxClientCutMemory,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool, nil
|
return pool, nil
|
||||||
|
@ -1860,7 +1852,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get network info for max object size: %w", err)
|
return fmt.Errorf("get network info for max object size: %w", err)
|
||||||
}
|
}
|
||||||
p.partsBufferPool = NewPartBufferPool(p.maxClientCutMemory, ni.MaxObjectSize())
|
p.maxObjectSize = ni.MaxObjectSize()
|
||||||
|
|
||||||
go p.startRebalance(ctx)
|
go p.startRebalance(ctx)
|
||||||
return nil
|
return nil
|
||||||
|
@ -1883,10 +1875,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||||
params.errorThreshold = defaultErrorThreshold
|
params.errorThreshold = defaultErrorThreshold
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.maxClientCutMemory == 0 {
|
|
||||||
params.maxClientCutMemory = defaultMaxClientCutMemory
|
|
||||||
}
|
|
||||||
|
|
||||||
if params.clientRebalanceInterval <= 0 {
|
if params.clientRebalanceInterval <= 0 {
|
||||||
params.clientRebalanceInterval = defaultRebalanceInterval
|
params.clientRebalanceInterval = defaultRebalanceInterval
|
||||||
}
|
}
|
||||||
|
@ -2284,24 +2272,10 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.clientCut {
|
if prm.clientCut {
|
||||||
buff, err := p.partsBufferPool.GetBuffer()
|
|
||||||
if err != nil {
|
|
||||||
return oid.ID{}, fmt.Errorf("cannot get buffer for put operations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
prm.setPartBuffer(buff.Buffer)
|
|
||||||
|
|
||||||
var ni netmap.NetworkInfo
|
var ni netmap.NetworkInfo
|
||||||
ni.SetCurrentEpoch(p.cache.Epoch())
|
ni.SetCurrentEpoch(p.cache.Epoch())
|
||||||
ni.SetMaxObjectSize(p.partsBufferPool.ParBufferSize()) // we want to use initial max object size in PayloadSizeLimiter
|
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
|
||||||
|
|
||||||
prm.setNetworkInfo(ni)
|
prm.setNetworkInfo(ni)
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if errFree := p.partsBufferPool.FreeBuffer(buff); errFree != nil {
|
|
||||||
p.log(zap.WarnLevel, "failed to free part buffer", zap.Error(err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := ctxCall.client.objectPut(ctx, prm)
|
id, err := ctxCall.client.objectPut(ctx, prm)
|
||||||
|
|
Loading…
Reference in a new issue
Should we create an issue for that?
#149