Use pool_size_local and separate pool for local puts #71
10 changed files with 31 additions and 5 deletions
|
@ -43,6 +43,7 @@ Changelog for FrostFS Node
|
||||||
- Possible panic during write-cache initialization (#2234)
|
- Possible panic during write-cache initialization (#2234)
|
||||||
- Do not fetch an object if `meta` is missing it (#61)
|
- Do not fetch an object if `meta` is missing it (#61)
|
||||||
- Create contract wallet only by `init` and `update-config` command (#63)
|
- Create contract wallet only by `init` and `update-config` command (#63)
|
||||||
|
- Actually use `object.put.pool_size_local` and independent pool for local puts (#64).
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
### Updated
|
### Updated
|
||||||
|
|
|
@ -496,6 +496,10 @@ type cfgObjectRoutines struct {
|
||||||
|
|
||||||
putRemoteCapacity int
|
putRemoteCapacity int
|
||||||
|
|
||||||
|
putLocal *ants.Pool
|
||||||
|
|
||||||
|
putLocalCapacity int
|
||||||
|
|
||||||
replicatorPoolSize int
|
replicatorPoolSize int
|
||||||
|
|
||||||
replication *ants.Pool
|
replication *ants.Pool
|
||||||
|
@ -834,10 +838,13 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||||
optNonBlocking := ants.WithNonblocking(true)
|
optNonBlocking := ants.WithNonblocking(true)
|
||||||
|
|
||||||
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote()
|
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote()
|
||||||
|
|
||||||
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking)
|
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal()
|
||||||
|
pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg)
|
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg)
|
||||||
if pool.replicatorPoolSize <= 0 {
|
if pool.replicatorPoolSize <= 0 {
|
||||||
pool.replicatorPoolSize = pool.putRemoteCapacity
|
pool.replicatorPoolSize = pool.putRemoteCapacity
|
||||||
|
|
|
@ -39,3 +39,15 @@ func (g PutConfig) PoolSizeRemote() int {
|
||||||
|
|
||||||
return PutPoolSizeDefault
|
return PutPoolSizeDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PoolSizeLocal returns the value of "pool_size_local" config parameter.
|
||||||
|
//
|
||||||
|
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||||
|
func (g PutConfig) PoolSizeLocal() int {
|
||||||
|
v := config.Int(g.cfg, "pool_size_local")
|
||||||
|
if v > 0 {
|
||||||
|
return int(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return PutPoolSizeDefault
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ func TestObjectSection(t *testing.T) {
|
||||||
empty := configtest.EmptyConfig()
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
||||||
|
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
|
||||||
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -21,6 +22,7 @@ func TestObjectSection(t *testing.T) {
|
||||||
|
|
||||||
var fileConfigTest = func(c *config.Config) {
|
var fileConfigTest = func(c *config.Config) {
|
||||||
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
||||||
|
require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal())
|
||||||
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -275,7 +275,7 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithNetworkMapSource(c.netMapSource),
|
putsvc.WithNetworkMapSource(c.netMapSource),
|
||||||
putsvc.WithNetmapKeys(c),
|
putsvc.WithNetmapKeys(c),
|
||||||
putsvc.WithNetworkState(c.cfgNetmap.state),
|
putsvc.WithNetworkState(c.cfgNetmap.state),
|
||||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
|
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||||
putsvc.WithLogger(c.log),
|
putsvc.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -85,6 +85,7 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
||||||
|
FROSTFS_OBJECT_PUT_POOL_SIZE_LOCAL=200
|
||||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
|
|
|
@ -132,7 +132,8 @@
|
||||||
"tombstone_lifetime": 10
|
"tombstone_lifetime": 10
|
||||||
},
|
},
|
||||||
"put": {
|
"put": {
|
||||||
"pool_size_remote": 100
|
"pool_size_remote": 100,
|
||||||
|
"pool_size_local": 200
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
|
|
|
@ -111,6 +111,7 @@ object:
|
||||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||||
put:
|
put:
|
||||||
pool_size_remote: 100 # number of async workers for remote PUT operations
|
pool_size_remote: 100 # number of async workers for remote PUT operations
|
||||||
|
pool_size_local: 200 # number of async workers for local PUT operations
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
|
|
|
@ -427,3 +427,4 @@ object:
|
||||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
||||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||||
|
| `put.pool_size_local` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||||
|
|
|
@ -116,9 +116,9 @@ func WithNetworkMapSource(v netmap.Source) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithWorkerPools(remote util.WorkerPool) Option {
|
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.remotePool = remote
|
c.remotePool, c.localPool = remote, local
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue