diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 7f148e57e..ecfc407be 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -46,15 +46,21 @@ func (ap *Processor) processStartAudit(epoch uint64) { return } - var auditCtx context.Context - auditCtx, ap.prevAuditCanceler = context.WithCancel(context.Background()) + cancelChannel := make(chan struct{}) + ap.prevAuditCanceler = func() { + select { + case <-cancelChannel: // already closed + default: + close(cancelChannel) + } + } pivot := make([]byte, sha256.Size) - ap.startAuditTasksOnContainers(auditCtx, containers, log, pivot, nm, epoch) + ap.startAuditTasksOnContainers(cancelChannel, containers, log, pivot, nm, epoch) } -func (ap *Processor) startAuditTasksOnContainers(ctx context.Context, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) { +func (ap *Processor) startAuditTasksOnContainers(cancelChannel <-chan struct{}, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) { for i := range containers { cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure if err != nil { @@ -107,18 +113,14 @@ func (ap *Processor) startAuditTasksOnContainers(ctx context.Context, containers epoch: epoch, rep: ap.reporter, }). - WithAuditContext(ctx). + WithCancelChannel(cancelChannel). WithContainerID(containers[i]). WithStorageGroupList(storageGroups). WithContainerStructure(cnr.Value). WithContainerNodes(nodes). WithNetworkMap(nm) - if err := ap.taskManager.PushTask(auditTask); err != nil { - ap.log.Error("could not push audit task", - zap.String("error", err.Error()), - ) - } + ap.taskManager.PushTask(auditTask) } } diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index cb514b165..31e8a8c55 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -24,7 +24,7 @@ type ( } TaskManager interface { - PushTask(*audit.Task) error + PushTask(*audit.Task) // Must skip all tasks planned for execution and // return their number. diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index f2778fd65..bf720c330 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -194,9 +194,7 @@ func (c *Context) init() { )} } -func (c *Context) expired() bool { - ctx := c.task.AuditContext() - +func (c *Context) expired(ctx context.Context) bool { select { case <-ctx.Done(): c.log.Debug("audit context is done", diff --git a/pkg/services/audit/auditor/exec.go b/pkg/services/audit/auditor/exec.go index ceb6556e2..e603818b8 100644 --- a/pkg/services/audit/auditor/exec.go +++ b/pkg/services/audit/auditor/exec.go @@ -1,16 +1,18 @@ package auditor import ( + "context" "fmt" ) // Execute audits container data. -func (c *Context) Execute() { +func (c *Context) Execute(ctx context.Context, onCompleted func()) { + defer onCompleted() c.init() checks := []struct { name string - exec func() + exec func(context.Context) }{ {name: "PoR", exec: c.executePoR}, {name: "PoP", exec: c.executePoP}, @@ -20,11 +22,11 @@ func (c *Context) Execute() { for i := range checks { c.log.Debug(fmt.Sprintf("executing %s check...", checks[i].name)) - if c.expired() { + if c.expired(ctx) { break } - checks[i].exec() + checks[i].exec(ctx) if i == len(checks)-1 { c.complete() diff --git a/pkg/services/audit/auditor/pdp.go b/pkg/services/audit/auditor/pdp.go index 13b50e498..8a184eb7e 100644 --- a/pkg/services/audit/auditor/pdp.go +++ b/pkg/services/audit/auditor/pdp.go @@ -2,6 +2,7 @@ package auditor import ( "bytes" + "context" "sync" "time" @@ -13,12 +14,12 @@ import ( "go.uber.org/zap" ) -func (c *Context) executePDP() { - c.processPairs() +func (c *Context) executePDP(ctx context.Context) { + c.processPairs(ctx) c.writePairsResult() } -func (c *Context) processPairs() { +func (c *Context) processPairs(ctx context.Context) { wg := new(sync.WaitGroup) for i := range c.pairs { @@ -26,7 +27,7 @@ func (c *Context) processPairs() { wg.Add(1) if err := c.pdpWorkerPool.Submit(func() { - c.processPair(p) + c.processPair(ctx, p) wg.Done() }); err != nil { wg.Done() @@ -37,9 +38,9 @@ func (c *Context) processPairs() { c.pdpWorkerPool.Release() } -func (c *Context) processPair(p *gamePair) { +func (c *Context) processPair(ctx context.Context, p *gamePair) { c.distributeRanges(p) - c.collectHashes(p) + c.collectHashes(ctx, p) c.analyzeHashes(p) } @@ -106,7 +107,7 @@ func (c *Context) splitPayload(id oid.ID) []uint64 { return notches } -func (c *Context) collectHashes(p *gamePair) { +func (c *Context) collectHashes(ctx context.Context, p *gamePair) { fn := func(n netmap.NodeInfo, rngs []*object.Range) [][]byte { // Here we randomize the order a bit: the hypothesis is that this // makes it harder for an unscrupulous node to come up with a @@ -137,7 +138,7 @@ func (c *Context) collectHashes(p *gamePair) { getRangeHashPrm.Range = rngs[i] - h, err := c.cnrCom.GetRangeHash(c.task.AuditContext(), getRangeHashPrm) + h, err := c.cnrCom.GetRangeHash(ctx, getRangeHashPrm) if err != nil { c.log.Debug("could not get payload range hash", zap.Stringer("id", p.id), diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go index 45afa7937..32b837794 100644 --- a/pkg/services/audit/auditor/pop.go +++ b/pkg/services/audit/auditor/pop.go @@ -1,6 +1,8 @@ package auditor import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/tzhash/tz" @@ -12,8 +14,8 @@ const ( minGamePayloadSize = hashRangeNumber * tz.Size ) -func (c *Context) executePoP() { - c.buildCoverage() +func (c *Context) executePoP(ctx context.Context) { + c.buildCoverage(ctx) c.report.SetPlacementCounters( c.counters.hit, @@ -22,13 +24,13 @@ func (c *Context) executePoP() { ) } -func (c *Context) buildCoverage() { +func (c *Context) buildCoverage(ctx context.Context) { policy := c.task.ContainerStructure().PlacementPolicy() // select random member from another storage group // and process all placement vectors c.iterateSGMembersPlacementRand(func(id oid.ID, ind int, nodes []netmap.NodeInfo) bool { - c.processObjectPlacement(id, nodes, policy.ReplicaNumberByIndex(ind)) + c.processObjectPlacement(ctx, id, nodes, policy.ReplicaNumberByIndex(ind)) return c.containerCovered() }) } @@ -38,7 +40,7 @@ func (c *Context) containerCovered() bool { return c.cnrNodesNum <= len(c.pairedNodes) } -func (c *Context) processObjectPlacement(id oid.ID, nodes []netmap.NodeInfo, replicas uint32) { +func (c *Context) processObjectPlacement(ctx context.Context, id oid.ID, nodes []netmap.NodeInfo, replicas uint32) { var ( ok uint32 optimal bool @@ -57,7 +59,7 @@ func (c *Context) processObjectPlacement(id oid.ID, nodes []netmap.NodeInfo, rep getHeaderPrm.Node = nodes[i] // try to get object header from node - hdr, err := c.cnrCom.GetHeader(c.task.AuditContext(), getHeaderPrm) + hdr, err := c.cnrCom.GetHeader(ctx, getHeaderPrm) if err != nil { c.log.Debug("could not get object header from candidate", zap.Stringer("id", id), diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go index ff322d6e1..6011217f8 100644 --- a/pkg/services/audit/auditor/por.go +++ b/pkg/services/audit/auditor/por.go @@ -2,6 +2,7 @@ package auditor import ( "bytes" + "context" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" @@ -14,7 +15,7 @@ import ( "go.uber.org/zap" ) -func (c *Context) executePoR() { +func (c *Context) executePoR(ctx context.Context) { wg := new(sync.WaitGroup) sgs := c.task.StorageGroupList() @@ -22,7 +23,7 @@ func (c *Context) executePoR() { wg.Add(1) if err := c.porWorkerPool.Submit(func() { - c.checkStorageGroupPoR(sg.ID(), sg.StorageGroup()) + c.checkStorageGroupPoR(ctx, sg.ID(), sg.StorageGroup()) wg.Done() }); err != nil { wg.Done() @@ -36,7 +37,7 @@ func (c *Context) executePoR() { } // nolint: funlen -func (c *Context) checkStorageGroupPoR(sgID oid.ID, sg storagegroupSDK.StorageGroup) { +func (c *Context) checkStorageGroupPoR(ctx context.Context, sgID oid.ID, sg storagegroupSDK.StorageGroup) { members := sg.Members() c.updateSGInfo(sgID, members) @@ -80,7 +81,7 @@ func (c *Context) checkStorageGroupPoR(sgID oid.ID, sg storagegroupSDK.StorageGr getHeaderPrm.Node = flat[j] - hdr, err := c.cnrCom.GetHeader(c.task.AuditContext(), getHeaderPrm) + hdr, err := c.cnrCom.GetHeader(ctx, getHeaderPrm) if err != nil { c.log.Debug("can't head object", zap.String("remote_node", netmap.StringifyPublicKey(flat[j])), diff --git a/pkg/services/audit/task.go b/pkg/services/audit/task.go index 35932a69e..3de5ac2c6 100644 --- a/pkg/services/audit/task.go +++ b/pkg/services/audit/task.go @@ -1,8 +1,6 @@ package audit import ( - "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -10,11 +8,10 @@ import ( ) // Task groups groups the container audit parameters. -// nolint: containedctx type Task struct { - reporter Reporter + cancelCh <-chan struct{} - auditContext context.Context + reporter Reporter idCnr cid.ID @@ -41,18 +38,15 @@ func (t *Task) Reporter() Reporter { return t.reporter } -// WithAuditContext sets context of the audit of the current epoch. -func (t *Task) WithAuditContext(ctx context.Context) *Task { - if t != nil { - t.auditContext = ctx +func (t *Task) WithCancelChannel(ch <-chan struct{}) *Task { + if ch != nil { + t.cancelCh = ch } - return t } -// AuditContext returns context of the audit of the current epoch. -func (t *Task) AuditContext() context.Context { - return t.auditContext +func (t *Task) CancelChannel() <-chan struct{} { + return t.cancelCh } // WithContainerID sets identifier of the container under audit. diff --git a/pkg/services/audit/taskmanager/listen.go b/pkg/services/audit/taskmanager/listen.go index 4e8a3df68..a16052e13 100644 --- a/pkg/services/audit/taskmanager/listen.go +++ b/pkg/services/audit/taskmanager/listen.go @@ -33,18 +33,28 @@ func (m *Manager) Listen(ctx context.Context) { return } - m.handleTask(task) + tCtx, tCancel := context.WithCancel(ctx) // cancel task in case of listen cancel + go func() { + select { + case <-tCtx.Done(): // listen cancelled or task completed + return + case <-task.CancelChannel(): // new epoch + tCancel() + } + }() + + m.handleTask(tCtx, task, tCancel) } } } -func (m *Manager) handleTask(task *audit.Task) { +func (m *Manager) handleTask(ctx context.Context, task *audit.Task, onCompleted func()) { pdpPool, err := m.pdpPoolGenerator() if err != nil { m.log.Error("could not generate PDP worker pool", zap.String("error", err.Error()), ) - + onCompleted() return } @@ -53,7 +63,7 @@ func (m *Manager) handleTask(task *audit.Task) { m.log.Error("could not generate PoR worker pool", zap.String("error", err.Error()), ) - + onCompleted() return } @@ -61,9 +71,10 @@ func (m *Manager) handleTask(task *audit.Task) { WithPDPWorkerPool(pdpPool). WithPoRWorkerPool(porPool) - if err := m.workerPool.Submit(auditContext.Execute); err != nil { + if err := m.workerPool.Submit(func() { auditContext.Execute(ctx, onCompleted) }); err != nil { // may be we should report it m.log.Warn("could not submit audit task") + onCompleted() } } diff --git a/pkg/services/audit/taskmanager/push.go b/pkg/services/audit/taskmanager/push.go index 13f8fd12d..805897dbf 100644 --- a/pkg/services/audit/taskmanager/push.go +++ b/pkg/services/audit/taskmanager/push.go @@ -5,9 +5,6 @@ import ( ) // PushTask adds a task to the queue for processing. -// -// Returns error if task was not added to the queue. -func (m *Manager) PushTask(t *audit.Task) error { +func (m *Manager) PushTask(t *audit.Task) { m.ch <- t - return nil }