diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 339047f1d..17e6138e5 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -21,7 +21,13 @@ func (ap *Processor) processStartAudit(epoch uint64) { log := ap.log.With(zap.Uint64("epoch", epoch)) ap.prevAuditCanceler() - ap.taskManager.Reset() + + skipped := ap.taskManager.Reset() + if skipped > 0 { + ap.log.Info("some tasks from previous epoch are skipped", + zap.Int("amount", skipped), + ) + } containers, err := ap.selectContainersToAudit(epoch) if err != nil { diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 8a43379c2..d764cea46 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -30,7 +30,10 @@ type ( TaskManager interface { PushTask(*audit.Task) error - Reset() + + // Must skip all tasks planned for execution and + // return their number. + Reset() int } // Processor of events related with data audit. diff --git a/pkg/services/audit/taskmanager/reset.go b/pkg/services/audit/taskmanager/reset.go index bd48468db..86f2538cf 100644 --- a/pkg/services/audit/taskmanager/reset.go +++ b/pkg/services/audit/taskmanager/reset.go @@ -1,8 +1,11 @@ package audittask // Reset pops all tasks from the queue. -func (m *Manager) Reset() { - for len(m.ch) > 0 { +// Returns amount of popped elements. +func (m *Manager) Reset() (popped int) { + for ; len(m.ch) > 0; popped++ { <-m.ch } + + return }