forked from TrueCloudLab/frostfs-node
[#255] services/audit: Implement task manager
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
0f0be2377b
commit
580c9c974a
3 changed files with 138 additions and 0 deletions
52
pkg/services/audit/taskmanager/listen.go
Normal file
52
pkg/services/audit/taskmanager/listen.go
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
package audittask
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Listen starts the process of processing tasks from the queue.
|
||||||
|
//
|
||||||
|
// The listener is terminated by context.
|
||||||
|
func (m *Manager) Listen(ctx context.Context) {
|
||||||
|
m.log.Info("process routine",
|
||||||
|
zap.Uint32("queue capacity", m.queueCap),
|
||||||
|
)
|
||||||
|
|
||||||
|
m.ch = make(chan *audit.Task, m.queueCap)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
m.log.Warn("stop listener by context",
|
||||||
|
zap.String("error", ctx.Err().Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
case task, ok := <-m.ch:
|
||||||
|
if !ok {
|
||||||
|
m.log.Warn("queue channel is closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.handleTask(task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) handleTask(task *audit.Task) {
|
||||||
|
if err := m.workerPool.Submit(func() {
|
||||||
|
m.generateContext(task).Execute()
|
||||||
|
}); err != nil {
|
||||||
|
// may be we should report it
|
||||||
|
m.log.Warn("could not submit audit task")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) generateContext(task *audit.Task) *auditor.Context {
|
||||||
|
return auditor.NewContext(m.ctxPrm).
|
||||||
|
WithTask(task)
|
||||||
|
}
|
73
pkg/services/audit/taskmanager/manager.go
Normal file
73
pkg/services/audit/taskmanager/manager.go
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
package audittask
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager represents an entity performing data audit tasks.
|
||||||
|
type Manager struct {
|
||||||
|
*cfg
|
||||||
|
|
||||||
|
ch chan *audit.Task
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option is a Manager's constructor option.
|
||||||
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
|
queueCap uint32
|
||||||
|
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
ctxPrm auditor.ContextPrm
|
||||||
|
|
||||||
|
reporter audit.Reporter
|
||||||
|
|
||||||
|
workerPool util.WorkerPool
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultCfg() *cfg {
|
||||||
|
return &cfg{
|
||||||
|
log: zap.L(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates, initializes and returns new Manager instance.
|
||||||
|
func New(opts ...Option) *Manager {
|
||||||
|
c := defaultCfg()
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Manager{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogger returns option to specify Manager's logger.
|
||||||
|
func WithLogger(l *logger.Logger) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.log = l.With(zap.String("component", "Audit task manager"))
|
||||||
|
c.ctxPrm.SetLogger(l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithWorkerPool returns option to set worker pool
|
||||||
|
// for task execution.
|
||||||
|
func WithWorkerPool(p util.WorkerPool) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.workerPool = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithQueueCapacity returns option to set task queue capacity.
|
||||||
|
func WithQueueCapacity(cap uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.queueCap = cap
|
||||||
|
}
|
||||||
|
}
|
13
pkg/services/audit/taskmanager/push.go
Normal file
13
pkg/services/audit/taskmanager/push.go
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
package audittask
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PushTask adds a task to the queue for processing.
|
||||||
|
//
|
||||||
|
// Returns error if task was not added to the queue.
|
||||||
|
func (m *Manager) PushTask(t *audit.Task) error {
|
||||||
|
m.ch <- t
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue