diff --git a/pkg/services/audit/taskmanager/listen.go b/pkg/services/audit/taskmanager/listen.go new file mode 100644 index 000000000..f070a9946 --- /dev/null +++ b/pkg/services/audit/taskmanager/listen.go @@ -0,0 +1,52 @@ +package audittask + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/audit" + "github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor" + "go.uber.org/zap" +) + +// Listen starts the process of processing tasks from the queue. +// +// The listener is terminated by context. +func (m *Manager) Listen(ctx context.Context) { + m.log.Info("process routine", + zap.Uint32("queue capacity", m.queueCap), + ) + + m.ch = make(chan *audit.Task, m.queueCap) + + for { + select { + case <-ctx.Done(): + m.log.Warn("stop listener by context", + zap.String("error", ctx.Err().Error()), + ) + + return + case task, ok := <-m.ch: + if !ok { + m.log.Warn("queue channel is closed") + return + } + + m.handleTask(task) + } + } +} + +func (m *Manager) handleTask(task *audit.Task) { + if err := m.workerPool.Submit(func() { + m.generateContext(task).Execute() + }); err != nil { + // may be we should report it + m.log.Warn("could not submit audit task") + } +} + +func (m *Manager) generateContext(task *audit.Task) *auditor.Context { + return auditor.NewContext(m.ctxPrm). + WithTask(task) +} diff --git a/pkg/services/audit/taskmanager/manager.go b/pkg/services/audit/taskmanager/manager.go new file mode 100644 index 000000000..85b707951 --- /dev/null +++ b/pkg/services/audit/taskmanager/manager.go @@ -0,0 +1,73 @@ +package audittask + +import ( + "github.com/nspcc-dev/neofs-node/pkg/services/audit" + "github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Manager represents an entity performing data audit tasks. +type Manager struct { + *cfg + + ch chan *audit.Task +} + +// Option is a Manager's constructor option. +type Option func(*cfg) + +type cfg struct { + queueCap uint32 + + log *logger.Logger + + ctxPrm auditor.ContextPrm + + reporter audit.Reporter + + workerPool util.WorkerPool +} + +func defaultCfg() *cfg { + return &cfg{ + log: zap.L(), + } +} + +// New creates, initializes and returns new Manager instance. +func New(opts ...Option) *Manager { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Manager{ + cfg: c, + } +} + +// WithLogger returns option to specify Manager's logger. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.log = l.With(zap.String("component", "Audit task manager")) + c.ctxPrm.SetLogger(l) + } +} + +// WithWorkerPool returns option to set worker pool +// for task execution. +func WithWorkerPool(p util.WorkerPool) Option { + return func(c *cfg) { + c.workerPool = p + } +} + +// WithQueueCapacity returns option to set task queue capacity. +func WithQueueCapacity(cap uint32) Option { + return func(c *cfg) { + c.queueCap = cap + } +} diff --git a/pkg/services/audit/taskmanager/push.go b/pkg/services/audit/taskmanager/push.go new file mode 100644 index 000000000..e52ac6215 --- /dev/null +++ b/pkg/services/audit/taskmanager/push.go @@ -0,0 +1,13 @@ +package audittask + +import ( + "github.com/nspcc-dev/neofs-node/pkg/services/audit" +) + +// PushTask adds a task to the queue for processing. +// +// Returns error if task was not added to the queue. +func (m *Manager) PushTask(t *audit.Task) error { + m.ch <- t + return nil +}