Support reloading service pool sizes on SIGHUP #956
6 changed files with 43 additions and 35 deletions
|
@ -632,14 +632,8 @@ type cfgAccessPolicyEngine struct {
|
||||||
type cfgObjectRoutines struct {
|
type cfgObjectRoutines struct {
|
||||||
putRemote *ants.Pool
|
putRemote *ants.Pool
|
||||||
|
|
||||||
putRemoteCapacity int
|
|
||||||
|
|
||||||
putLocal *ants.Pool
|
putLocal *ants.Pool
|
||||||
|
|
||||||
putLocalCapacity int
|
|
||||||
|
|
||||||
replicatorPoolSize int
|
|
||||||
|
|
||||||
replication *ants.Pool
|
replication *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1094,20 +1088,20 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||||
|
|
||||||
optNonBlocking := ants.WithNonblocking(true)
|
optNonBlocking := ants.WithNonblocking(true)
|
||||||
|
|
||||||
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote()
|
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
|
||||||
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking)
|
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal()
|
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
|
||||||
pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking)
|
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg)
|
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
||||||
if pool.replicatorPoolSize <= 0 {
|
if replicatorPoolSize <= 0 {
|
||||||
pool.replicatorPoolSize = pool.putRemoteCapacity
|
replicatorPoolSize = putRemoteCapacity
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.replication, err = ants.NewPool(pool.replicatorPoolSize)
|
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
return pool
|
return pool
|
||||||
|
@ -1241,6 +1235,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
setRuntimeParameters(c)
|
setRuntimeParameters(c)
|
||||||
return nil
|
return nil
|
||||||
}})
|
}})
|
||||||
|
components = append(components, dCmp{"pools", c.reloadPools})
|
||||||
components = append(components, dCmp{"tracing", func() error {
|
components = append(components, dCmp{"tracing", func() error {
|
||||||
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
|
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
|
||||||
if updated {
|
if updated {
|
||||||
|
@ -1285,6 +1280,28 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) reloadPools() error {
|
||||||
|
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
|
||||||
|
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
|
||||||
|
|
||||||
|
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
|
||||||
|
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
|
||||||
|
|
||||||
|
newSize = replicatorconfig.PoolSize(c.appCfg)
|
||||||
|
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {
|
||||||
|
|||||||
|
oldSize := p.Cap()
|
||||||
|
if oldSize != newSize {
|
||||||
|
c.log.Info(logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
|
||||||
|
zap.Int("old", oldSize), zap.Int("new", newSize))
|
||||||
|
p.Tune(newSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cfg) reloadAppConfig() error {
|
func (c *cfg) reloadAppConfig() error {
|
||||||
unlock := c.LockAppConfigExclusive()
|
unlock := c.LockAppConfigExclusive()
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
|
@ -263,7 +263,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
|
|
||||||
policer.WithPool(c.cfgObject.pool.replication),
|
policer.WithPool(c.cfgObject.pool.replication),
|
||||||
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -451,6 +451,7 @@ const (
|
||||||
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
|
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
|
||||||
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
||||||
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
||||||
|
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
||||||
FrostFSNodeUpdatedConfigurationApplying = "updated configuration applying"
|
FrostFSNodeUpdatedConfigurationApplying = "updated configuration applying"
|
||||||
FrostFSNodeConfigurationHasBeenReloadedSuccessfully = "configuration has been reloaded successfully"
|
FrostFSNodeConfigurationHasBeenReloadedSuccessfully = "configuration has been reloaded successfully"
|
||||||
FrostFSNodeReadNewlyCreatedContainerAfterTheNotification = "read newly created container after the notification"
|
FrostFSNodeReadNewlyCreatedContainerAfterTheNotification = "read newly created container after the notification"
|
||||||
|
|
|
@ -64,8 +64,6 @@ type cfg struct {
|
||||||
|
|
||||||
taskPool *ants.Pool
|
taskPool *ants.Pool
|
||||||
|
|
||||||
maxCapacity int
|
|
||||||
|
|
||||||
batchSize, cacheSize uint32
|
batchSize, cacheSize uint32
|
||||||
|
|
||||||
rebalanceFreq, evictDuration, sleepDuration time.Duration
|
rebalanceFreq, evictDuration, sleepDuration time.Duration
|
||||||
|
@ -158,14 +156,6 @@ func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMaxCapacity returns option to set max capacity
|
|
||||||
// that can be set to the pool.
|
|
||||||
func WithMaxCapacity(capacity int) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.maxCapacity = capacity
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithPool returns option to set pool for
|
// WithPool returns option to set pool for
|
||||||
// policy and replication operations.
|
// policy and replication operations.
|
||||||
func WithPool(p *ants.Pool) Option {
|
func WithPool(p *ants.Pool) Option {
|
||||||
|
|
|
@ -66,7 +66,7 @@ func New(opts ...Option) *Policer {
|
||||||
cfg: c,
|
cfg: c,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
objsInWork: &objectsInWork{
|
objsInWork: &objectsInWork{
|
||||||
objs: make(map[oid.Address]struct{}, c.maxCapacity),
|
objs: make(map[oid.Address]struct{}, c.taskPool.Cap()),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,16 +48,12 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task pool
|
|
||||||
pool, err := ants.NewPool(4)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Policer instance
|
// Policer instance
|
||||||
p := New(
|
p := New(
|
||||||
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||||
WithContainerSource(containerSrc),
|
WithContainerSource(containerSrc),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
WithPool(pool),
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
@ -239,6 +235,7 @@ func TestProcessObject(t *testing.T) {
|
||||||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||||
}
|
}
|
||||||
})),
|
})),
|
||||||
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
addrWithType := objectcore.AddressWithType{
|
addrWithType := objectcore.AddressWithType{
|
||||||
|
@ -276,6 +273,7 @@ func TestProcessObjectError(t *testing.T) {
|
||||||
p := New(
|
p := New(
|
||||||
WithContainerSource(source),
|
WithContainerSource(source),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
addrWithType := objectcore.AddressWithType{
|
addrWithType := objectcore.AddressWithType{
|
||||||
|
@ -296,9 +294,6 @@ func TestIteratorContract(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
pool, err := ants.NewPool(4)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
it := &predefinedIterator{
|
it := &predefinedIterator{
|
||||||
scenario: []nextResult{
|
scenario: []nextResult{
|
||||||
{objs, nil},
|
{objs, nil},
|
||||||
|
@ -324,7 +319,7 @@ func TestIteratorContract(t *testing.T) {
|
||||||
WithKeySpaceIterator(it),
|
WithKeySpaceIterator(it),
|
||||||
WithContainerSource(containerSrc),
|
WithContainerSource(containerSrc),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
WithPool(pool),
|
WithPool(testPool(t)),
|
||||||
func(c *cfg) {
|
func(c *cfg) {
|
||||||
c.sleepDuration = time.Millisecond
|
c.sleepDuration = time.Millisecond
|
||||||
},
|
},
|
||||||
|
@ -348,6 +343,12 @@ func TestIteratorContract(t *testing.T) {
|
||||||
}, it.calls)
|
}, it.calls)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testPool(t *testing.T) *ants.Pool {
|
||||||
|
pool, err := ants.NewPool(4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return pool
|
||||||
|
}
|
||||||
|
|
||||||
type nextResult struct {
|
type nextResult struct {
|
||||||
objs []objectcore.AddressWithType
|
objs []objectcore.AddressWithType
|
||||||
err error
|
err error
|
||||||
|
|
Loading…
Add table
Reference in a new issue
I am not a fan of such small functions, but it helps to avoid forgetting to change something after copypaste