[#901] *: release worker pools where possible

Some of the pools are initialized during config initialization,
so it isn't possible currently to release them in one place.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2021-10-19 16:34:34 +03:00 committed by Alex Vanin
parent 10afd26354
commit 0057eeb0f7
5 changed files with 8 additions and 0 deletions

View file

@ -150,6 +150,9 @@ func (gc *gc) tickRemover() {
for { for {
select { select {
case <-gc.stopChannel: case <-gc.stopChannel:
if gc.workerPool != nil {
gc.workerPool.Release()
}
gc.log.Debug("GC is stopped") gc.log.Debug("GC is stopped")
return return
case <-timer.C: case <-timer.C:

View file

@ -33,6 +33,7 @@ func (c *Context) processPairs() {
} }
wg.Wait() wg.Wait()
c.pdpWorkerPool.Release()
} }
func (c *Context) processPair(p *gamePair) { func (c *Context) processPair(p *gamePair) {

View file

@ -30,6 +30,7 @@ func (c *Context) executePoR() {
} }
wg.Wait() wg.Wait()
c.porWorkerPool.Release()
c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load()) c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load())
} }

View file

@ -24,6 +24,7 @@ func (m *Manager) Listen(ctx context.Context) {
m.log.Warn("stop listener by context", m.log.Warn("stop listener by context",
zap.String("error", ctx.Err().Error()), zap.String("error", ctx.Err().Error()),
) )
m.workerPool.Release()
return return
case task, ok := <-m.ch: case task, ok := <-m.ch:

View file

@ -91,6 +91,7 @@ func WithMaxPDPSleepInterval(dur time.Duration) Option {
} }
// WithPDPWorkerPoolGenerator returns option to set worker pool for PDP pairs processing. // WithPDPWorkerPoolGenerator returns option to set worker pool for PDP pairs processing.
// Callback caller owns returned pool and must release it appropriately.
func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
return func(c *cfg) { return func(c *cfg) {
c.pdpPoolGenerator = f c.pdpPoolGenerator = f
@ -98,6 +99,7 @@ func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
} }
// WithPoRWorkerPoolGenerator returns option to set worker pool for PoR SG processing. // WithPoRWorkerPoolGenerator returns option to set worker pool for PoR SG processing.
// Callback caller owns returned pool and must release it appropriately.
func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
return func(c *cfg) { return func(c *cfg) {
c.porPoolGenerator = f c.porPoolGenerator = f