[#281] service/audit: Run each SG check in separate routing at PoR

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-24 14:26:07 +03:00 committed by Alex Vanin
parent 77cf97c236
commit e5108cf135
3 changed files with 55 additions and 10 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -23,13 +24,13 @@ type Context struct {
report *audit.Report report *audit.Report
// consider adding mutex to access caches sgMembersMtx sync.RWMutex
sgMembersCache map[int][]*object.ID sgMembersCache map[int][]*object.ID
placementMtx sync.Mutex
placementCache map[string][]netmap.Nodes placementCache map[string][]netmap.Nodes
porRequests, porRetries uint32 porRequests, porRetries atomic.Uint32
pairs []gamePair pairs []gamePair
@ -42,6 +43,7 @@ type Context struct {
cnrNodesNum int cnrNodesNum int
headMtx sync.RWMutex
headResponses map[string]shortHeader headResponses map[string]shortHeader
} }
@ -130,7 +132,7 @@ func (c *Context) WithTask(t *audit.Task) *Context {
return c return c
} }
// WithPDPWorkerPool sets worker pool for PDP pairs processing.. // WithPDPWorkerPool sets worker pool for PDP pairs processing.
func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context { func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context {
if c != nil { if c != nil {
c.pdpWorkerPool = pool c.pdpWorkerPool = pool
@ -198,6 +200,9 @@ func (c *Context) writeReport() {
} }
func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) { func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) {
c.placementMtx.Lock()
defer c.placementMtx.Unlock()
strID := id.String() strID := id.String()
if nn, ok := c.placementCache[strID]; ok { if nn, ok := c.placementCache[strID]; ok {
@ -219,6 +224,9 @@ func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) {
} }
func (c *Context) objectSize(id *object.ID) uint64 { func (c *Context) objectSize(id *object.ID) uint64 {
c.headMtx.RLock()
defer c.headMtx.RUnlock()
strID := id.String() strID := id.String()
if hdr, ok := c.headResponses[strID]; ok { if hdr, ok := c.headResponses[strID]; ok {
@ -229,6 +237,9 @@ func (c *Context) objectSize(id *object.ID) uint64 {
} }
func (c *Context) objectHomoHash(id *object.ID) []byte { func (c *Context) objectHomoHash(id *object.ID) []byte {
c.headMtx.RLock()
defer c.headMtx.RUnlock()
strID := id.String() strID := id.String()
if hdr, ok := c.headResponses[strID]; ok { if hdr, ok := c.headResponses[strID]; ok {
@ -239,6 +250,9 @@ func (c *Context) objectHomoHash(id *object.ID) []byte {
} }
func (c *Context) updateHeadResponses(hdr *object.Object) { func (c *Context) updateHeadResponses(hdr *object.Object) {
c.headMtx.Lock()
defer c.headMtx.Unlock()
strID := hdr.ID().String() strID := hdr.ID().String()
if _, ok := c.headResponses[strID]; !ok { if _, ok := c.headResponses[strID]; !ok {
@ -248,3 +262,10 @@ func (c *Context) updateHeadResponses(hdr *object.Object) {
} }
} }
} }
func (c *Context) updateSGInfo(ind int, members []*object.ID) {
c.sgMembersMtx.Lock()
defer c.sgMembersMtx.Unlock()
c.sgMembersCache[ind] = members
}

View file

@ -162,6 +162,9 @@ func (c *Context) iterateSGMembersRand(f func(*object.ID) bool) {
} }
func (c *Context) iterateSGInfo(f func([]*object.ID) bool) { func (c *Context) iterateSGInfo(f func([]*object.ID) bool) {
c.sgMembersMtx.RLock()
defer c.sgMembersMtx.RUnlock()
// we can add randomization like for SG members, // we can add randomization like for SG members,
// but list of storage groups is already expected // but list of storage groups is already expected
// to be shuffled since it is a Search response // to be shuffled since it is a Search response

View file

@ -2,6 +2,7 @@ package auditor
import ( import (
"bytes" "bytes"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
@ -11,10 +12,25 @@ import (
) )
func (c *Context) executePoR() { func (c *Context) executePoR() {
for i, sg := range c.task.StorageGroupList() { wg := new(sync.WaitGroup)
c.checkStorageGroupPoR(i, sg) // consider parallel it sgs := c.task.StorageGroupList()
for i := range sgs {
wg.Add(1)
sg := sgs[i]
if err := c.porWorkerPool.Submit(func() {
c.checkStorageGroupPoR(i, sg)
wg.Done()
}); err != nil {
wg.Done()
}
} }
c.report.SetPoRCounters(c.porRequests, c.porRetries)
wg.Wait()
c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load())
} }
func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
@ -28,11 +44,13 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
} }
members := storageGroup.Members() members := storageGroup.Members()
c.sgMembersCache[ind] = members c.updateSGInfo(ind, members)
var ( var (
tzHash []byte tzHash []byte
totalSize uint64 totalSize uint64
accRequests, accRetries uint32
) )
for i := range members { for i := range members {
@ -54,9 +72,9 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
}) })
for i := range flat { for i := range flat {
c.porRequests++ accRequests++
if i > 0 { // in best case audit get object header on first iteration if i > 0 { // in best case audit get object header on first iteration
c.porRetries++ accRetries++
} }
hdr, err := c.cnrCom.GetHeader(c.task, flat[i], members[i], true) hdr, err := c.cnrCom.GetHeader(c.task, flat[i], members[i], true)
@ -93,6 +111,9 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
} }
} }
c.porRequests.Add(accRequests)
c.porRetries.Add(accRetries)
sizeCheck := storageGroup.ValidationDataSize() == totalSize sizeCheck := storageGroup.ValidationDataSize() == totalSize
tzCheck := bytes.Equal(tzHash, storageGroup.ValidationDataHash().Sum()) tzCheck := bytes.Equal(tzHash, storageGroup.ValidationDataHash().Sum())