From 9212864f4291422304150b41aa19618e0b53c0bb Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 23 Dec 2020 11:56:49 +0300 Subject: [PATCH] [#258] services/audit: Implement PoP check Signed-off-by: Leonard Lyubich --- pkg/services/audit/auditor/context.go | 48 ++++++++ pkg/services/audit/auditor/exec.go | 17 ++- pkg/services/audit/auditor/pop.go | 161 ++++++++++++++++++++++++++ pkg/services/audit/auditor/por.go | 8 +- pkg/services/audit/auditor/util.go | 26 +++++ pkg/services/audit/report.go | 7 ++ 6 files changed, 255 insertions(+), 12 deletions(-) create mode 100644 pkg/services/audit/auditor/pop.go create mode 100644 pkg/services/audit/auditor/util.go diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index de49e82ea..4097537ae 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" "github.com/nspcc-dev/neofs-node/pkg/services/audit" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -23,6 +24,28 @@ type Context struct { sgMembersCache map[int][]*object.ID placementCache map[string][]netmap.Nodes + + pairs []gamePair + + pairedNodes map[uint64]pairMemberInfo + + counters struct { + hit, miss, fail uint32 + } + + cnrNodesNum int +} + +type pairMemberInfo struct { + failedPDP, passedPDP bool // at least one + + node *netmap.Node +} + +type gamePair struct { + n1, n2 *netmap.Node + + id *object.ID } // ContextPrm groups components required to conduct data audit checks. @@ -87,6 +110,10 @@ func (c *Context) init() { c.placementCache = make(map[string][]netmap.Nodes) + c.cnrNodesNum = len(c.task.ContainerNodes().Flatten()) + + c.pairedNodes = make(map[uint64]pairMemberInfo) + c.log = c.log.With( zap.Stringer("container ID", c.task.ContainerID()), ) @@ -118,3 +145,24 @@ func (c *Context) writeReport() { c.log.Error("could not write audit report") } } + +func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) { + strID := id.String() + + if nn, ok := c.placementCache[strID]; ok { + return nn, nil + } + + nn, err := placement.BuildObjectPlacement( + c.task.NetworkMap(), + c.task.ContainerNodes(), + id, + ) + if err != nil { + return nil, err + } + + c.placementCache[strID] = nn + + return nn, nil +} diff --git a/pkg/services/audit/auditor/exec.go b/pkg/services/audit/auditor/exec.go index 81c1cdbb2..1390bf54b 100644 --- a/pkg/services/audit/auditor/exec.go +++ b/pkg/services/audit/auditor/exec.go @@ -2,6 +2,8 @@ package auditor import ( "fmt" + + "go.uber.org/zap" ) // Execute audits container data. @@ -34,10 +36,15 @@ func (c *Context) Execute() { c.writeReport() } -func (c *Context) executePoP() { - // TODO: implement me -} - func (c *Context) executePDP() { - // TODO: implement me + // TODO: replace logging with real algorithm + log := c.log.With(zap.Int("nodes in container", c.cnrNodesNum)) + + for i := range c.pairs { + log.Debug("next pair for hash game", + zap.String("node 1", c.pairs[i].n1.Address()), + zap.String("node 2", c.pairs[i].n2.Address()), + zap.Stringer("object", c.pairs[i].id), + ) + } } diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go new file mode 100644 index 000000000..d6005b023 --- /dev/null +++ b/pkg/services/audit/auditor/pop.go @@ -0,0 +1,161 @@ +package auditor + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.uber.org/zap" +) + +func (c *Context) executePoP() { + c.buildCoverage() + + c.report.SetPlacementCounters( + c.counters.hit, + c.counters.miss, + c.counters.fail, + ) +} + +func (c *Context) buildCoverage() { + replicas := c.task.ContainerStructure().PlacementPolicy().Replicas() + + // select random member from another storage group + // and process all placement vectors + c.iterateSGMembersPlacementRand(func(id *object.ID, ind int, nodes netmap.Nodes) bool { + c.processObjectPlacement(id, nodes, replicas[ind].Count()) + return c.containerCovered() + }) +} + +func (c *Context) containerCovered() bool { + // number of container nodes can be calculated once + return c.cnrNodesNum <= len(c.pairedNodes) +} + +func (c *Context) processObjectPlacement(id *object.ID, nodes netmap.Nodes, replicas uint32) { + var ( + ok uint32 + optimal bool + + unpairedCandidate1, unpairedCandidate2 = -1, -1 + + pairedCandidate = -1 + ) + + for i := 0; !optimal && ok < replicas && i < len(nodes); i++ { + // try to get object header from node + _, err := c.cnrCom.GetHeader(c.task, nodes[i], id) + if err != nil { + c.log.Debug("could not get object header from candidate", + zap.Stringer("id", id), + zap.String("error", err.Error()), + ) + + continue + } + + // increment success counter + ok++ + + // update optimal flag + optimal = ok == replicas && uint32(i) < replicas + + // update potential candidates to be paired + if _, ok := c.pairedNodes[nodes[i].Hash()]; !ok { + if unpairedCandidate1 < 0 { + unpairedCandidate1 = i + } else if unpairedCandidate2 < 0 { + unpairedCandidate2 = i + } + } else if pairedCandidate < 0 { + pairedCandidate = i + } + } + + if optimal { + c.counters.hit++ + } else if ok == replicas { + c.counters.miss++ + } else { + c.counters.fail++ + } + + if unpairedCandidate1 >= 0 { + if unpairedCandidate2 >= 0 { + c.composePair(id, nodes[unpairedCandidate1], nodes[unpairedCandidate2]) + } else if pairedCandidate >= 0 { + c.composePair(id, nodes[unpairedCandidate1], nodes[pairedCandidate]) + } + } +} + +func (c *Context) composePair(id *object.ID, n1, n2 *netmap.Node) { + c.pairs = append(c.pairs, gamePair{ + n1: n1, + n2: n2, + id: id, + }) + + c.pairedNodes[n1.Hash()] = pairMemberInfo{ + node: n1, + } + c.pairedNodes[n2.Hash()] = pairMemberInfo{ + node: n2, + } +} + +func (c *Context) iterateSGMembersPlacementRand(f func(*object.ID, int, netmap.Nodes) bool) { + // iterate over storage groups members for all storage groups (one by one) + // with randomly shuffled members + c.iterateSGMembersRand(func(id *object.ID) bool { + // build placement vector for the current object + nn, err := c.buildPlacement(id) + if err != nil { + c.log.Debug("could not build placement for object", + zap.Stringer("id", id), + zap.String("error", err.Error()), + ) + + return false + } + + for i, nodes := range nn { + if f(id, i, nodes) { + return true + } + } + + return false + }) +} + +func (c *Context) iterateSGMembersRand(f func(*object.ID) bool) { + c.iterateSGInfo(func(members []*object.ID) bool { + ln := len(members) + + processed := make(map[uint64]struct{}, ln-1) + + for len(processed) < ln { + ind := nextRandUint64(uint64(ln), processed) + processed[ind] = struct{}{} + + if f(members[ind]) { + return true + } + } + + return false + }) +} + +func (c *Context) iterateSGInfo(f func([]*object.ID) bool) { + // we can add randomization like for SG members, + // but list of storage groups is already expected + // to be shuffled since it is a Search response + // with unpredictable order + for _, members := range c.sgMembersCache { + if f(members) { + return + } + } +} diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go index f2a703616..acd9a0643 100644 --- a/pkg/services/audit/auditor/por.go +++ b/pkg/services/audit/auditor/por.go @@ -34,11 +34,7 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { ) for i := range members { - objectPlacement, err := placement.BuildObjectPlacement( - c.task.NetworkMap(), - c.task.ContainerNodes(), - members[i], - ) + objectPlacement, err := c.buildPlacement(members[i]) if err != nil { c.log.Info("can't build placement for storage group member", zap.Stringer("sg", sg), @@ -48,8 +44,6 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { continue } - c.placementCache[members[i].String()] = objectPlacement - for _, node := range placement.FlattenNodes(objectPlacement) { hdr, err := c.cnrCom.GetHeader(c.task, node, members[i]) if err != nil { diff --git a/pkg/services/audit/auditor/util.go b/pkg/services/audit/auditor/util.go new file mode 100644 index 000000000..d521cb96c --- /dev/null +++ b/pkg/services/audit/auditor/util.go @@ -0,0 +1,26 @@ +package auditor + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/rand" +) + +// returns random uint64 number [0; n) outside exclude map. +// exclude must contain no more than n-1 elements [0; n) +func nextRandUint64(n uint64, exclude map[uint64]struct{}) uint64 { + ln := uint64(len(exclude)) + + ind := randUint64(n - ln) + + for i := uint64(0); ; i++ { + if i >= ind { + if _, ok := exclude[i]; !ok { + return i + } + } + } +} + +// returns random uint64 number [0, n). +func randUint64(n uint64) uint64 { + return rand.Uint64(rand.New(), int64(n)) +} diff --git a/pkg/services/audit/report.go b/pkg/services/audit/report.go index bf23ab14f..210df1751 100644 --- a/pkg/services/audit/report.go +++ b/pkg/services/audit/report.go @@ -47,3 +47,10 @@ func (r *Report) PassedPoR(sg *object.ID) { func (r *Report) FailedPoR(sg *object.ID) { r.res.SetFailSG(append(r.res.FailSG(), sg)) } + +// SetPlacementCounters sets counters of compliance with placement. +func (r *Report) SetPlacementCounters(hit, miss, fail uint32) { + r.res.SetHit(hit) + r.res.SetMiss(miss) + r.res.SetFail(fail) +}