diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 82b48e3d2d..339047f1d7 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -21,6 +21,7 @@ func (ap *Processor) processStartAudit(epoch uint64) { log := ap.log.With(zap.Uint64("epoch", epoch)) ap.prevAuditCanceler() + ap.taskManager.Reset() containers, err := ap.selectContainersToAudit(epoch) if err != nil { diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index e38adb5a7f..8a43379c2c 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -30,6 +30,7 @@ type ( TaskManager interface { PushTask(*audit.Task) error + Reset() } // Processor of events related with data audit. diff --git a/pkg/services/audit/taskmanager/reset.go b/pkg/services/audit/taskmanager/reset.go new file mode 100644 index 0000000000..bd48468db1 --- /dev/null +++ b/pkg/services/audit/taskmanager/reset.go @@ -0,0 +1,8 @@ +package audittask + +// Reset pops all tasks from the queue. +func (m *Manager) Reset() { + for len(m.ch) > 0 { + <-m.ch + } +}