Support reloading service pool sizes on SIGHUP #956

Merged
fyrchik merged 4 commits from fyrchik/frostfs-node:reload-pools into master 2024-02-08 15:49:45 +00:00
6 changed files with 43 additions and 35 deletions

View file

@ -632,14 +632,8 @@ type cfgAccessPolicyEngine struct {
type cfgObjectRoutines struct { type cfgObjectRoutines struct {
putRemote *ants.Pool putRemote *ants.Pool
putRemoteCapacity int
putLocal *ants.Pool putLocal *ants.Pool
putLocalCapacity int
replicatorPoolSize int
replication *ants.Pool replication *ants.Pool
} }
@ -1094,20 +1088,20 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
optNonBlocking := ants.WithNonblocking(true) optNonBlocking := ants.WithNonblocking(true)
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote() putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking) pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
fatalOnErr(err) fatalOnErr(err)
pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal() putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking) pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
fatalOnErr(err) fatalOnErr(err)
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg) replicatorPoolSize := replicatorconfig.PoolSize(cfg)
if pool.replicatorPoolSize <= 0 { if replicatorPoolSize <= 0 {
pool.replicatorPoolSize = pool.putRemoteCapacity replicatorPoolSize = putRemoteCapacity
} }
pool.replication, err = ants.NewPool(pool.replicatorPoolSize) pool.replication, err = ants.NewPool(replicatorPoolSize)
fatalOnErr(err) fatalOnErr(err)
return pool return pool
@ -1241,6 +1235,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
setRuntimeParameters(c) setRuntimeParameters(c)
return nil return nil
}}) }})
components = append(components, dCmp{"pools", c.reloadPools})
components = append(components, dCmp{"tracing", func() error { components = append(components, dCmp{"tracing", func() error {
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg)) updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
if updated { if updated {
@ -1285,6 +1280,28 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
} }
func (c *cfg) reloadPools() error {
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
newSize = replicatorconfig.PoolSize(c.appCfg)
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
return nil
}
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {

I am not a fan of such small functions, but it helps to avoid forgetting to change something after copypaste

I am not a fan of such small functions, but it helps to avoid forgetting to change something after copypaste
oldSize := p.Cap()
if oldSize != newSize {
c.log.Info(logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
zap.Int("old", oldSize), zap.Int("new", newSize))
p.Tune(newSize)
}
}
func (c *cfg) reloadAppConfig() error { func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive() unlock := c.LockAppConfigExclusive()
defer unlock() defer unlock()

View file

@ -263,7 +263,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
) )
} }
}), }),
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
policer.WithPool(c.cfgObject.pool.replication), policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()), policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
) )

View file

@ -451,6 +451,7 @@ const (
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation" FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
FrostFSNodeTracingConfigationUpdated = "tracing configation updated" FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update" FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
FrostFSNodeUpdatedConfigurationApplying = "updated configuration applying" FrostFSNodeUpdatedConfigurationApplying = "updated configuration applying"
FrostFSNodeConfigurationHasBeenReloadedSuccessfully = "configuration has been reloaded successfully" FrostFSNodeConfigurationHasBeenReloadedSuccessfully = "configuration has been reloaded successfully"
FrostFSNodeReadNewlyCreatedContainerAfterTheNotification = "read newly created container after the notification" FrostFSNodeReadNewlyCreatedContainerAfterTheNotification = "read newly created container after the notification"

View file

@ -64,8 +64,6 @@ type cfg struct {
taskPool *ants.Pool taskPool *ants.Pool
maxCapacity int
batchSize, cacheSize uint32 batchSize, cacheSize uint32
rebalanceFreq, evictDuration, sleepDuration time.Duration rebalanceFreq, evictDuration, sleepDuration time.Duration
@ -158,14 +156,6 @@ func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
} }
} }
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for // WithPool returns option to set pool for
// policy and replication operations. // policy and replication operations.
func WithPool(p *ants.Pool) Option { func WithPool(p *ants.Pool) Option {

View file

@ -66,7 +66,7 @@ func New(opts ...Option) *Policer {
cfg: c, cfg: c,
cache: cache, cache: cache,
objsInWork: &objectsInWork{ objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity), objs: make(map[oid.Address]struct{}, c.taskPool.Cap()),
}, },
} }
} }

View file

@ -48,16 +48,12 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
return nil return nil
} }
// Task pool
pool, err := ants.NewPool(4)
require.NoError(t, err)
// Policer instance // Policer instance
p := New( p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}), WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrc), WithContainerSource(containerSrc),
WithBuryFunc(buryFn), WithBuryFunc(buryFn),
WithPool(pool), WithPool(testPool(t)),
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -239,6 +235,7 @@ func TestProcessObject(t *testing.T) {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
} }
})), })),
WithPool(testPool(t)),
) )
addrWithType := objectcore.AddressWithType{ addrWithType := objectcore.AddressWithType{
@ -276,6 +273,7 @@ func TestProcessObjectError(t *testing.T) {
p := New( p := New(
WithContainerSource(source), WithContainerSource(source),
WithBuryFunc(buryFn), WithBuryFunc(buryFn),
WithPool(testPool(t)),
) )
addrWithType := objectcore.AddressWithType{ addrWithType := objectcore.AddressWithType{
@ -296,9 +294,6 @@ func TestIteratorContract(t *testing.T) {
return nil return nil
} }
pool, err := ants.NewPool(4)
require.NoError(t, err)
it := &predefinedIterator{ it := &predefinedIterator{
scenario: []nextResult{ scenario: []nextResult{
{objs, nil}, {objs, nil},
@ -324,7 +319,7 @@ func TestIteratorContract(t *testing.T) {
WithKeySpaceIterator(it), WithKeySpaceIterator(it),
WithContainerSource(containerSrc), WithContainerSource(containerSrc),
WithBuryFunc(buryFn), WithBuryFunc(buryFn),
WithPool(pool), WithPool(testPool(t)),
func(c *cfg) { func(c *cfg) {
c.sleepDuration = time.Millisecond c.sleepDuration = time.Millisecond
}, },
@ -348,6 +343,12 @@ func TestIteratorContract(t *testing.T) {
}, it.calls) }, it.calls)
} }
func testPool(t *testing.T) *ants.Pool {
pool, err := ants.NewPool(4)
require.NoError(t, err)
return pool
}
type nextResult struct { type nextResult struct {
objs []objectcore.AddressWithType objs []objectcore.AddressWithType
err error err error