diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 497bec119..08228236d 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -99,4 +99,5 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("audit.timeout.search", "10s") cfg.SetDefault("audit.pdp.max_sleep_interval", "5s") cfg.SetDefault("audit.pdp.pairs_pool_size", "10") + cfg.SetDefault("audit.por.pairs_pool_size", "10") } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 6a287062b..a7578e894 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -226,6 +226,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error }) pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") + porPoolSize := cfg.GetInt("audit.por.pairs_pool_size") auditTaskManager := audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), @@ -236,6 +237,9 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { return ants.NewPool(pdpPoolSize) }), + audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) { + return ants.NewPool(porPoolSize) + }), ) server.workers = append(server.workers, auditTaskManager.Listen) diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index 2ac379c86..198002967 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -75,7 +75,7 @@ type ContextPrm struct { cnrCom ContainerCommunicator - pdpWorkerPool util.WorkerPool + pdpWorkerPool, porWorkerPool util.WorkerPool } // ContainerCommunicator is an interface of @@ -139,6 +139,15 @@ func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context { return c } +// WithPoRWorkerPool sets worker pool for PoR SG processing. +func (c *Context) WithPoRWorkerPool(pool util.WorkerPool) *Context { + if c != nil { + c.porWorkerPool = pool + } + + return c +} + func (c *Context) containerID() *container.ID { return c.task.ContainerID() } diff --git a/pkg/services/audit/taskmanager/listen.go b/pkg/services/audit/taskmanager/listen.go index 638ffd001..fc992360d 100644 --- a/pkg/services/audit/taskmanager/listen.go +++ b/pkg/services/audit/taskmanager/listen.go @@ -47,8 +47,18 @@ func (m *Manager) handleTask(task *audit.Task) { return } + porPool, err := m.pdpPoolGenerator() + if err != nil { + m.log.Error("could not generate PoR worker pool", + zap.String("error", err.Error()), + ) + + return + } + auditContext := m.generateContext(task). - WithPDPWorkerPool(pdpPool) + WithPDPWorkerPool(pdpPool). + WithPoRWorkerPool(porPool) if err := m.workerPool.Submit(auditContext.Execute); err != nil { // may be we should report it diff --git a/pkg/services/audit/taskmanager/manager.go b/pkg/services/audit/taskmanager/manager.go index 7ebe34be0..22d64f9f1 100644 --- a/pkg/services/audit/taskmanager/manager.go +++ b/pkg/services/audit/taskmanager/manager.go @@ -31,7 +31,7 @@ type cfg struct { workerPool util.WorkerPool - pdpPoolGenerator func() (util.WorkerPool, error) + pdpPoolGenerator, porPoolGenerator func() (util.WorkerPool, error) } func defaultCfg() *cfg { @@ -98,3 +98,10 @@ func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { c.pdpPoolGenerator = f } } + +// WithPoRWorkerPool returns option to set worker pool for PoR SG processing. +func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { + return func(c *cfg) { + c.porPoolGenerator = f + } +}