[#674] object/put: Use pseudo worker pool for local operations
After storage engine started to limit number of PUT operations there is no need to limited worker pool in Object Put service. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
07130855aa
commit
40a4a7faa2
8 changed files with 5 additions and 27 deletions
|
@ -193,7 +193,7 @@ type cfgLocalStorage struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgObjectRoutines struct {
|
type cfgObjectRoutines struct {
|
||||||
putRemote, putLocal *ants.Pool
|
putRemote *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgControlService struct {
|
type cfgControlService struct {
|
||||||
|
@ -441,11 +441,6 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.putLocal, err = ants.NewPool(objectconfig.Put(cfg).PoolSizeLocal(), optNonBlocking)
|
|
||||||
if err != nil {
|
|
||||||
fatalOnErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return pool
|
return pool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,15 +39,3 @@ func (g PutConfig) PoolSizeRemote() int {
|
||||||
|
|
||||||
return PutPoolSizeDefault
|
return PutPoolSizeDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolSizeLocal returns value of "pool_size_local" config parameter.
|
|
||||||
//
|
|
||||||
// Returns PutPoolSizeDefault if value is not positive number.
|
|
||||||
func (g PutConfig) PoolSizeLocal() int {
|
|
||||||
v := config.Int(g.cfg, "pool_size_local")
|
|
||||||
if v > 0 {
|
|
||||||
return int(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return PutPoolSizeDefault
|
|
||||||
}
|
|
||||||
|
|
|
@ -14,14 +14,12 @@ func TestObjectSection(t *testing.T) {
|
||||||
empty := configtest.EmptyConfig()
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
||||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
var fileConfigTest = func(c *config.Config) {
|
var fileConfigTest = func(c *config.Config) {
|
||||||
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
||||||
require.Equal(t, 101, objectconfig.Put(c).PoolSizeLocal())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
configtest.ForEachFileType(path, fileConfigTest)
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
|
@ -281,7 +281,7 @@ func initObjectService(c *cfg) {
|
||||||
objectCore.WithDeleteHandler(objInhumer),
|
objectCore.WithDeleteHandler(objInhumer),
|
||||||
),
|
),
|
||||||
putsvc.WithNetworkState(c.cfgNetmap.state),
|
putsvc.WithNetworkState(c.cfgNetmap.state),
|
||||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
|
||||||
putsvc.WithLogger(c.log),
|
putsvc.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,6 @@ NEOFS_REPLICATOR_PUT_TIMEOUT=15s
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
||||||
NEOFS_OBJECT_PUT_POOL_SIZE_LOCAL=101
|
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
NEOFS_STORAGE_SHARD_POOL_SIZE=15
|
NEOFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
|
|
|
@ -97,8 +97,7 @@
|
||||||
},
|
},
|
||||||
"object": {
|
"object": {
|
||||||
"put": {
|
"put": {
|
||||||
"pool_size_remote": 100,
|
"pool_size_remote": 100
|
||||||
"pool_size_local": 101
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
|
|
|
@ -85,7 +85,6 @@ replicator:
|
||||||
object:
|
object:
|
||||||
put:
|
put:
|
||||||
pool_size_remote: 100 # number of async workers for remote PUT operations
|
pool_size_remote: 100 # number of async workers for remote PUT operations
|
||||||
pool_size_local: 101 # number of async workers for local PUT operations
|
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
|
|
|
@ -117,9 +117,9 @@ func WithNetworkMapSource(v netmap.Source) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
func WithWorkerPools(remote util.WorkerPool) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.remotePool, c.localPool = remote, local
|
c.remotePool = remote
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue