diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index 198002967..8c2213d26 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -23,13 +24,13 @@ type Context struct { report *audit.Report - // consider adding mutex to access caches - + sgMembersMtx sync.RWMutex sgMembersCache map[int][]*object.ID + placementMtx sync.Mutex placementCache map[string][]netmap.Nodes - porRequests, porRetries uint32 + porRequests, porRetries atomic.Uint32 pairs []gamePair @@ -42,6 +43,7 @@ type Context struct { cnrNodesNum int + headMtx sync.RWMutex headResponses map[string]shortHeader } @@ -130,7 +132,7 @@ func (c *Context) WithTask(t *audit.Task) *Context { return c } -// WithPDPWorkerPool sets worker pool for PDP pairs processing.. +// WithPDPWorkerPool sets worker pool for PDP pairs processing. func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context { if c != nil { c.pdpWorkerPool = pool @@ -198,6 +200,9 @@ func (c *Context) writeReport() { } func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) { + c.placementMtx.Lock() + defer c.placementMtx.Unlock() + strID := id.String() if nn, ok := c.placementCache[strID]; ok { @@ -219,6 +224,9 @@ func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) { } func (c *Context) objectSize(id *object.ID) uint64 { + c.headMtx.RLock() + defer c.headMtx.RUnlock() + strID := id.String() if hdr, ok := c.headResponses[strID]; ok { @@ -229,6 +237,9 @@ func (c *Context) objectSize(id *object.ID) uint64 { } func (c *Context) objectHomoHash(id *object.ID) []byte { + c.headMtx.RLock() + defer c.headMtx.RUnlock() + strID := id.String() if hdr, ok := c.headResponses[strID]; ok { @@ -239,6 +250,9 @@ func (c *Context) objectHomoHash(id *object.ID) []byte { } func (c *Context) updateHeadResponses(hdr *object.Object) { + c.headMtx.Lock() + defer c.headMtx.Unlock() + strID := hdr.ID().String() if _, ok := c.headResponses[strID]; !ok { @@ -248,3 +262,10 @@ func (c *Context) updateHeadResponses(hdr *object.Object) { } } } + +func (c *Context) updateSGInfo(ind int, members []*object.ID) { + c.sgMembersMtx.Lock() + defer c.sgMembersMtx.Unlock() + + c.sgMembersCache[ind] = members +} diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go index bad3e9e6f..39e060443 100644 --- a/pkg/services/audit/auditor/pop.go +++ b/pkg/services/audit/auditor/pop.go @@ -162,6 +162,9 @@ func (c *Context) iterateSGMembersRand(f func(*object.ID) bool) { } func (c *Context) iterateSGInfo(f func([]*object.ID) bool) { + c.sgMembersMtx.RLock() + defer c.sgMembersMtx.RUnlock() + // we can add randomization like for SG members, // but list of storage groups is already expected // to be shuffled since it is a Search response diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go index 715993135..0902e16e8 100644 --- a/pkg/services/audit/auditor/por.go +++ b/pkg/services/audit/auditor/por.go @@ -2,6 +2,7 @@ package auditor import ( "bytes" + "sync" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -11,10 +12,25 @@ import ( ) func (c *Context) executePoR() { - for i, sg := range c.task.StorageGroupList() { - c.checkStorageGroupPoR(i, sg) // consider parallel it + wg := new(sync.WaitGroup) + sgs := c.task.StorageGroupList() + + for i := range sgs { + wg.Add(1) + + sg := sgs[i] + + if err := c.porWorkerPool.Submit(func() { + c.checkStorageGroupPoR(i, sg) + wg.Done() + }); err != nil { + wg.Done() + } } - c.report.SetPoRCounters(c.porRequests, c.porRetries) + + wg.Wait() + + c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load()) } func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { @@ -28,11 +44,13 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { } members := storageGroup.Members() - c.sgMembersCache[ind] = members + c.updateSGInfo(ind, members) var ( tzHash []byte totalSize uint64 + + accRequests, accRetries uint32 ) for i := range members { @@ -54,9 +72,9 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { }) for i := range flat { - c.porRequests++ + accRequests++ if i > 0 { // in best case audit get object header on first iteration - c.porRetries++ + accRetries++ } hdr, err := c.cnrCom.GetHeader(c.task, flat[i], members[i], true) @@ -93,6 +111,9 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { } } + c.porRequests.Add(accRequests) + c.porRetries.Add(accRetries) + sizeCheck := storageGroup.ValidationDataSize() == totalSize tzCheck := bytes.Equal(tzHash, storageGroup.ValidationDataHash().Sum())