From 0057eeb0f7f79be6abb4e41ba04b9bce365017fa Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 19 Oct 2021 16:34:34 +0300 Subject: [PATCH] [#901] *: release worker pools where possible Some of the pools are initialized during config initialization, so it isn't possible currently to release them in one place. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/shard/gc.go | 3 +++ pkg/services/audit/auditor/pdp.go | 1 + pkg/services/audit/auditor/por.go | 1 + pkg/services/audit/taskmanager/listen.go | 1 + pkg/services/audit/taskmanager/manager.go | 2 ++ 5 files changed, 8 insertions(+) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 79a3e151ef..f1c832f89d 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -150,6 +150,9 @@ func (gc *gc) tickRemover() { for { select { case <-gc.stopChannel: + if gc.workerPool != nil { + gc.workerPool.Release() + } gc.log.Debug("GC is stopped") return case <-timer.C: diff --git a/pkg/services/audit/auditor/pdp.go b/pkg/services/audit/auditor/pdp.go index 49e481c119..d9d082b0cc 100644 --- a/pkg/services/audit/auditor/pdp.go +++ b/pkg/services/audit/auditor/pdp.go @@ -33,6 +33,7 @@ func (c *Context) processPairs() { } wg.Wait() + c.pdpWorkerPool.Release() } func (c *Context) processPair(p *gamePair) { diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go index a0c65a1ee9..9ff298c254 100644 --- a/pkg/services/audit/auditor/por.go +++ b/pkg/services/audit/auditor/por.go @@ -30,6 +30,7 @@ func (c *Context) executePoR() { } wg.Wait() + c.porWorkerPool.Release() c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load()) } diff --git a/pkg/services/audit/taskmanager/listen.go b/pkg/services/audit/taskmanager/listen.go index 480d501ad2..1f5ef3f966 100644 --- a/pkg/services/audit/taskmanager/listen.go +++ b/pkg/services/audit/taskmanager/listen.go @@ -24,6 +24,7 @@ func (m *Manager) Listen(ctx context.Context) { m.log.Warn("stop listener by context", zap.String("error", ctx.Err().Error()), ) + m.workerPool.Release() return case task, ok := <-m.ch: diff --git a/pkg/services/audit/taskmanager/manager.go b/pkg/services/audit/taskmanager/manager.go index 8df658d0ec..b04bf4dfd3 100644 --- a/pkg/services/audit/taskmanager/manager.go +++ b/pkg/services/audit/taskmanager/manager.go @@ -91,6 +91,7 @@ func WithMaxPDPSleepInterval(dur time.Duration) Option { } // WithPDPWorkerPoolGenerator returns option to set worker pool for PDP pairs processing. +// Callback caller owns returned pool and must release it appropriately. func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { return func(c *cfg) { c.pdpPoolGenerator = f @@ -98,6 +99,7 @@ func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { } // WithPoRWorkerPoolGenerator returns option to set worker pool for PoR SG processing. +// Callback caller owns returned pool and must release it appropriately. func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { return func(c *cfg) { c.porPoolGenerator = f