diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 715dce1ee..497bec119 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -97,4 +97,6 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("audit.timeout.head", "5s") cfg.SetDefault("audit.timeout.rangehash", "5s") cfg.SetDefault("audit.timeout.search", "10s") + cfg.SetDefault("audit.pdp.max_sleep_interval", "5s") + cfg.SetDefault("audit.pdp.pairs_pool_size", "10") } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 51ea3cb87..346f1b788 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -21,6 +21,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" + util2 "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/precision" "github.com/panjf2000/ants/v2" "github.com/pkg/errors" @@ -224,11 +225,17 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), }) + pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") + auditTaskManager := audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), audittask.WithLogger(log), audittask.WithContainerCommunicator(clientCache), + audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")), + audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { + return ants.NewPool(pdpPoolSize, ants.WithNonblocking(true)) + }), ) server.workers = append(server.workers, auditTaskManager.Listen) diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index f0ebbf1b8..eaef4ad90 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -1,12 +1,16 @@ package auditor import ( + "sync" + "time" + "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -27,7 +31,8 @@ type Context struct { pairs []gamePair - pairedNodes map[uint64]pairMemberInfo + pairedMtx sync.Mutex + pairedNodes map[uint64]*pairMemberInfo counters struct { hit, miss, fail uint32 @@ -48,6 +53,10 @@ type gamePair struct { n1, n2 *netmap.Node id *object.ID + + rn1, rn2 []*object.Range + + hh1, hh2 [][]byte } type shortHeader struct { @@ -58,9 +67,13 @@ type shortHeader struct { // ContextPrm groups components required to conduct data audit checks. type ContextPrm struct { + maxPDPSleep uint64 + log *logger.Logger cnrCom ContainerCommunicator + + pdpWorkerPool util.WorkerPool } // ContainerCommunicator is an interface of @@ -98,6 +111,14 @@ func (p *ContextPrm) SetContainerCommunicator(cnrCom ContainerCommunicator) { } } +// SetMaxPDPSleep sets maximum sleep interval between range hash requests. +// as part of PDP check. +func (p *ContextPrm) SetMaxPDPSleep(dur time.Duration) { + if p != nil { + p.maxPDPSleep = uint64(dur) + } +} + // WithTask sets container audit parameters. func (c *Context) WithTask(t *audit.Task) *Context { if c != nil { @@ -107,6 +128,15 @@ func (c *Context) WithTask(t *audit.Task) *Context { return c } +// WithPDPWorkerPool sets worker pool for PDP pairs processing.. +func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context { + if c != nil { + c.pdpWorkerPool = pool + } + + return c +} + func (c *Context) containerID() *container.ID { return c.task.ContainerID() } @@ -120,7 +150,7 @@ func (c *Context) init() { c.cnrNodesNum = len(c.task.ContainerNodes().Flatten()) - c.pairedNodes = make(map[uint64]pairMemberInfo) + c.pairedNodes = make(map[uint64]*pairMemberInfo) c.headResponses = make(map[string]shortHeader) diff --git a/pkg/services/audit/auditor/exec.go b/pkg/services/audit/auditor/exec.go index 1390bf54b..ceb6556e2 100644 --- a/pkg/services/audit/auditor/exec.go +++ b/pkg/services/audit/auditor/exec.go @@ -2,8 +2,6 @@ package auditor import ( "fmt" - - "go.uber.org/zap" ) // Execute audits container data. @@ -35,16 +33,3 @@ func (c *Context) Execute() { c.writeReport() } - -func (c *Context) executePDP() { - // TODO: replace logging with real algorithm - log := c.log.With(zap.Int("nodes in container", c.cnrNodesNum)) - - for i := range c.pairs { - log.Debug("next pair for hash game", - zap.String("node 1", c.pairs[i].n1.Address()), - zap.String("node 2", c.pairs[i].n2.Address()), - zap.Stringer("object", c.pairs[i].id), - ) - } -} diff --git a/pkg/services/audit/auditor/pdp.go b/pkg/services/audit/auditor/pdp.go new file mode 100644 index 000000000..39d2e6bb6 --- /dev/null +++ b/pkg/services/audit/auditor/pdp.go @@ -0,0 +1,223 @@ +package auditor + +import ( + "bytes" + "sync" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/tzhash/tz" + "go.uber.org/zap" +) + +func (c *Context) executePDP() { + c.processPairs() + c.writePairsResult() +} + +func (c *Context) processPairs() { + wg := new(sync.WaitGroup) + + for i := range c.pairs { + p := &c.pairs[i] + wg.Add(1) + + if err := c.pdpWorkerPool.Submit(func() { + c.processPair(p) + wg.Done() + }); err != nil { + wg.Done() + } + } + + wg.Wait() +} + +func (c *Context) processPair(p *gamePair) { + c.distributeRanges(p) + c.collectHashes(p) + c.analyzeHashes(p) +} + +func (c *Context) distributeRanges(p *gamePair) { + p.rn1 = make([]*object.Range, hashRangeNumber-1) + p.rn2 = make([]*object.Range, hashRangeNumber-1) + + for i := 0; i < hashRangeNumber-1; i++ { + p.rn1[i] = object.NewRange() + p.rn2[i] = object.NewRange() + } + + notches := c.splitPayload(p.id) + + { // node 1 + // [0:n2] + p.rn1[0].SetLength(notches[1]) + + // [n2:n3] + p.rn1[1].SetOffset(notches[1]) + p.rn1[1].SetLength(notches[2] - notches[1]) + + // [n3:full] + p.rn1[2].SetOffset(notches[2]) + p.rn1[2].SetLength(notches[3] - notches[2]) + } + + { // node 2 + // [0:n1] + p.rn2[0].SetLength(notches[0]) + + // [n1:n2] + p.rn2[1].SetOffset(notches[0]) + p.rn2[1].SetLength(notches[1] - notches[0]) + + // [n2:full] + p.rn2[2].SetOffset(notches[1]) + p.rn2[2].SetLength(notches[3] - notches[1]) + } +} + +func (c *Context) splitPayload(id *object.ID) []uint64 { + var ( + prev uint64 + size = c.objectSize(id) + notches = make([]uint64, 0, hashRangeNumber) + ) + + for i := uint64(0); i < hashRangeNumber; i++ { + var nextLn uint64 + if i < hashRangeNumber-1 { + nextLn = randUint64(size-prev-(hashRangeNumber-i)) + 1 + } else { + nextLn = size - prev + } + + notches = append(notches, prev+nextLn) + + prev += nextLn + } + + return notches +} + +func (c *Context) collectHashes(p *gamePair) { + fn := func(n *netmap.Node, rngs []*object.Range, hashWriter func([]byte)) { + // TODO: add order randomization + for i := range rngs { + sleepDur := time.Duration(randUint64(c.maxPDPSleep)) + + c.log.Debug("sleep before get range hash", + zap.Stringer("interval", sleepDur), + ) + + time.Sleep(time.Duration(sleepDur)) + + h, err := c.cnrCom.GetRangeHash(c.task, n, p.id, rngs[i]) + if err != nil { + c.log.Debug("could not get payload range hash", + zap.Stringer("id", p.id), + zap.String("node", n.Address()), + zap.String("error", err.Error()), + ) + return + } + + hashWriter(h) + } + } + + p.hh1 = make([][]byte, 0, len(p.rn1)) + + fn(p.n1, p.rn1, func(h []byte) { + p.hh1 = append(p.hh1, h) + }) + + p.hh2 = make([][]byte, 0, len(p.rn2)) + + fn(p.n2, p.rn2, func(h []byte) { + p.hh2 = append(p.hh2, h) + }) +} + +func (c *Context) analyzeHashes(p *gamePair) { + if len(p.hh1) != hashRangeNumber-1 || len(p.hh2) != hashRangeNumber-1 { + c.failNodesPDP(p.n1, p.n2) + return + } + + h1, err := tz.Concat([][]byte{p.hh2[0], p.hh2[1]}) + if err != nil || !bytes.Equal(p.hh1[0], h1) { + c.failNodesPDP(p.n1, p.n2) + return + } + + h2, err := tz.Concat([][]byte{p.hh1[1], p.hh1[2]}) + if err != nil || !bytes.Equal(p.hh2[2], h2) { + c.failNodesPDP(p.n1, p.n2) + return + } + + fh, err := tz.Concat([][]byte{h1, h2}) + if err != nil || !bytes.Equal(fh, c.objectHomoHash(p.id)) { + c.failNodesPDP(p.n1, p.n2) + return + } + + c.passNodesPDP(p.n1, p.n2) +} + +func (c *Context) failNodesPDP(ns ...*netmap.Node) { + c.pairedMtx.Lock() + + for i := range ns { + c.pairedNodes[ns[i].Hash()].failedPDP = true + } + + c.pairedMtx.Unlock() +} + +func (c *Context) passNodesPDP(ns ...*netmap.Node) { + c.pairedMtx.Lock() + + for i := range ns { + c.pairedNodes[ns[i].Hash()].passedPDP = true + } + + c.pairedMtx.Unlock() +} + +func (c *Context) writePairsResult() { + var failCount, okCount int + + c.iteratePairedNodes( + func(*netmap.Node) { failCount++ }, + func(*netmap.Node) { okCount++ }, + ) + + failedNodes := make([][]byte, 0, failCount) + passedNodes := make([][]byte, 0, okCount) + + c.iteratePairedNodes( + func(n *netmap.Node) { + failedNodes = append(failedNodes, n.PublicKey()) + }, + func(n *netmap.Node) { + passedNodes = append(passedNodes, n.PublicKey()) + }, + ) + + c.report.SetPDPResults(passedNodes, failedNodes) +} + +func (c *Context) iteratePairedNodes(onFail, onPass func(*netmap.Node)) { + for _, pairedNode := range c.pairedNodes { + if pairedNode.failedPDP { + onFail(pairedNode.node) + } + + if pairedNode.passedPDP { + onPass(pairedNode.node) + } + } +} diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go index 4d73385c4..c46351d1a 100644 --- a/pkg/services/audit/auditor/pop.go +++ b/pkg/services/audit/auditor/pop.go @@ -48,7 +48,7 @@ func (c *Context) processObjectPlacement(id *object.ID, nodes netmap.Nodes, repl pairedCandidate = -1 ) - for i := 0; !optimal && ok < replicas && i < len(nodes); i++ { + for i := 0; ok < replicas && i < len(nodes); i++ { // try to get object header from node hdr, err := c.cnrCom.GetHeader(c.task, nodes[i], id) if err != nil { @@ -109,10 +109,10 @@ func (c *Context) composePair(id *object.ID, n1, n2 *netmap.Node) { id: id, }) - c.pairedNodes[n1.Hash()] = pairMemberInfo{ + c.pairedNodes[n1.Hash()] = &pairMemberInfo{ node: n1, } - c.pairedNodes[n2.Hash()] = pairMemberInfo{ + c.pairedNodes[n2.Hash()] = &pairMemberInfo{ node: n2, } } diff --git a/pkg/services/audit/report.go b/pkg/services/audit/report.go index 210df1751..2a5083837 100644 --- a/pkg/services/audit/report.go +++ b/pkg/services/audit/report.go @@ -54,3 +54,9 @@ func (r *Report) SetPlacementCounters(hit, miss, fail uint32) { r.res.SetMiss(miss) r.res.SetFail(fail) } + +// SetPDPResults sets lists of nodes according to their PDP results. +func (r *Report) SetPDPResults(passed, failed [][]byte) { + r.res.SetPassNodes(passed) + r.res.SetFailNodes(failed) +} diff --git a/pkg/services/audit/taskmanager/listen.go b/pkg/services/audit/taskmanager/listen.go index f070a9946..638ffd001 100644 --- a/pkg/services/audit/taskmanager/listen.go +++ b/pkg/services/audit/taskmanager/listen.go @@ -38,9 +38,19 @@ func (m *Manager) Listen(ctx context.Context) { } func (m *Manager) handleTask(task *audit.Task) { - if err := m.workerPool.Submit(func() { - m.generateContext(task).Execute() - }); err != nil { + pdpPool, err := m.pdpPoolGenerator() + if err != nil { + m.log.Error("could not generate PDP worker pool", + zap.String("error", err.Error()), + ) + + return + } + + auditContext := m.generateContext(task). + WithPDPWorkerPool(pdpPool) + + if err := m.workerPool.Submit(auditContext.Execute); err != nil { // may be we should report it m.log.Warn("could not submit audit task") } diff --git a/pkg/services/audit/taskmanager/manager.go b/pkg/services/audit/taskmanager/manager.go index 174282744..7ebe34be0 100644 --- a/pkg/services/audit/taskmanager/manager.go +++ b/pkg/services/audit/taskmanager/manager.go @@ -1,6 +1,8 @@ package audittask import ( + "time" + "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor" "github.com/nspcc-dev/neofs-node/pkg/util" @@ -28,6 +30,8 @@ type cfg struct { reporter audit.Reporter workerPool util.WorkerPool + + pdpPoolGenerator func() (util.WorkerPool, error) } func defaultCfg() *cfg { @@ -79,3 +83,18 @@ func WithContainerCommunicator(cnrCom auditor.ContainerCommunicator) Option { c.ctxPrm.SetContainerCommunicator(cnrCom) } } + +// WithMaxPDPSleepInterval returns option to set maximum sleep interval +// between range hash requests as part of PDP check. +func WithMaxPDPSleepInterval(dur time.Duration) Option { + return func(c *cfg) { + c.ctxPrm.SetMaxPDPSleep(dur) + } +} + +// WithPDPWorkerPool returns option to set worker pool for PDP pairs processing. +func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { + return func(c *cfg) { + c.pdpPoolGenerator = f + } +}