From 76d4e53ea0301b4f4ac6ccea04a2d97abf02d878 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 22 Dec 2020 14:27:39 +0300 Subject: [PATCH] [#255] services/audit: Skip all tasks from previous epoch in audit processor Implement Reset method on audit task manager that cleans task queue. Extended TaskManager interface with Reset method on IR side. Call Reset method in audit processor before new audit start. Signed-off-by: Leonard Lyubich --- pkg/innerring/processors/audit/process.go | 1 + pkg/innerring/processors/audit/processor.go | 1 + pkg/services/audit/taskmanager/reset.go | 8 ++++++++ 3 files changed, 10 insertions(+) create mode 100644 pkg/services/audit/taskmanager/reset.go diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 82b48e3d2..339047f1d 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -21,6 +21,7 @@ func (ap *Processor) processStartAudit(epoch uint64) { log := ap.log.With(zap.Uint64("epoch", epoch)) ap.prevAuditCanceler() + ap.taskManager.Reset() containers, err := ap.selectContainersToAudit(epoch) if err != nil { diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index e38adb5a7..8a43379c2 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -30,6 +30,7 @@ type ( TaskManager interface { PushTask(*audit.Task) error + Reset() } // Processor of events related with data audit. diff --git a/pkg/services/audit/taskmanager/reset.go b/pkg/services/audit/taskmanager/reset.go new file mode 100644 index 000000000..bd48468db --- /dev/null +++ b/pkg/services/audit/taskmanager/reset.go @@ -0,0 +1,8 @@ +package audittask + +// Reset pops all tasks from the queue. +func (m *Manager) Reset() { + for len(m.ch) > 0 { + <-m.ch + } +}