[#281] service/audit: Add separate pool for SG checks in PoR

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-24 14:24:25 +03:00 committed by Alex Vanin
parent 8e72abaab7
commit 0d7832f5e9
5 changed files with 34 additions and 3 deletions

View file

@ -99,4 +99,5 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("audit.timeout.search", "10s") cfg.SetDefault("audit.timeout.search", "10s")
cfg.SetDefault("audit.pdp.max_sleep_interval", "5s") cfg.SetDefault("audit.pdp.max_sleep_interval", "5s")
cfg.SetDefault("audit.pdp.pairs_pool_size", "10") cfg.SetDefault("audit.pdp.pairs_pool_size", "10")
cfg.SetDefault("audit.por.pairs_pool_size", "10")
} }

View file

@ -226,6 +226,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
}) })
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
porPoolSize := cfg.GetInt("audit.por.pairs_pool_size")
auditTaskManager := audittask.New( auditTaskManager := audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
@ -236,6 +237,9 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(pdpPoolSize) return ants.NewPool(pdpPoolSize)
}), }),
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(porPoolSize)
}),
) )
server.workers = append(server.workers, auditTaskManager.Listen) server.workers = append(server.workers, auditTaskManager.Listen)

View file

@ -75,7 +75,7 @@ type ContextPrm struct {
cnrCom ContainerCommunicator cnrCom ContainerCommunicator
pdpWorkerPool util.WorkerPool pdpWorkerPool, porWorkerPool util.WorkerPool
} }
// ContainerCommunicator is an interface of // ContainerCommunicator is an interface of
@ -139,6 +139,15 @@ func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context {
return c return c
} }
// WithPoRWorkerPool sets worker pool for PoR SG processing.
func (c *Context) WithPoRWorkerPool(pool util.WorkerPool) *Context {
if c != nil {
c.porWorkerPool = pool
}
return c
}
func (c *Context) containerID() *container.ID { func (c *Context) containerID() *container.ID {
return c.task.ContainerID() return c.task.ContainerID()
} }

View file

@ -47,8 +47,18 @@ func (m *Manager) handleTask(task *audit.Task) {
return return
} }
porPool, err := m.pdpPoolGenerator()
if err != nil {
m.log.Error("could not generate PoR worker pool",
zap.String("error", err.Error()),
)
return
}
auditContext := m.generateContext(task). auditContext := m.generateContext(task).
WithPDPWorkerPool(pdpPool) WithPDPWorkerPool(pdpPool).
WithPoRWorkerPool(porPool)
if err := m.workerPool.Submit(auditContext.Execute); err != nil { if err := m.workerPool.Submit(auditContext.Execute); err != nil {
// may be we should report it // may be we should report it

View file

@ -31,7 +31,7 @@ type cfg struct {
workerPool util.WorkerPool workerPool util.WorkerPool
pdpPoolGenerator func() (util.WorkerPool, error) pdpPoolGenerator, porPoolGenerator func() (util.WorkerPool, error)
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -98,3 +98,10 @@ func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
c.pdpPoolGenerator = f c.pdpPoolGenerator = f
} }
} }
// WithPoRWorkerPool returns option to set worker pool for PoR SG processing.
func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
return func(c *cfg) {
c.porPoolGenerator = f
}
}