[#258] services/audit: Implement PoP check
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
a5320408a5
commit
9212864f42
6 changed files with 255 additions and 12 deletions
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -23,6 +24,28 @@ type Context struct {
|
|||
sgMembersCache map[int][]*object.ID
|
||||
|
||||
placementCache map[string][]netmap.Nodes
|
||||
|
||||
pairs []gamePair
|
||||
|
||||
pairedNodes map[uint64]pairMemberInfo
|
||||
|
||||
counters struct {
|
||||
hit, miss, fail uint32
|
||||
}
|
||||
|
||||
cnrNodesNum int
|
||||
}
|
||||
|
||||
type pairMemberInfo struct {
|
||||
failedPDP, passedPDP bool // at least one
|
||||
|
||||
node *netmap.Node
|
||||
}
|
||||
|
||||
type gamePair struct {
|
||||
n1, n2 *netmap.Node
|
||||
|
||||
id *object.ID
|
||||
}
|
||||
|
||||
// ContextPrm groups components required to conduct data audit checks.
|
||||
|
@ -87,6 +110,10 @@ func (c *Context) init() {
|
|||
|
||||
c.placementCache = make(map[string][]netmap.Nodes)
|
||||
|
||||
c.cnrNodesNum = len(c.task.ContainerNodes().Flatten())
|
||||
|
||||
c.pairedNodes = make(map[uint64]pairMemberInfo)
|
||||
|
||||
c.log = c.log.With(
|
||||
zap.Stringer("container ID", c.task.ContainerID()),
|
||||
)
|
||||
|
@ -118,3 +145,24 @@ func (c *Context) writeReport() {
|
|||
c.log.Error("could not write audit report")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Context) buildPlacement(id *object.ID) ([]netmap.Nodes, error) {
|
||||
strID := id.String()
|
||||
|
||||
if nn, ok := c.placementCache[strID]; ok {
|
||||
return nn, nil
|
||||
}
|
||||
|
||||
nn, err := placement.BuildObjectPlacement(
|
||||
c.task.NetworkMap(),
|
||||
c.task.ContainerNodes(),
|
||||
id,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.placementCache[strID] = nn
|
||||
|
||||
return nn, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package auditor
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Execute audits container data.
|
||||
|
@ -34,10 +36,15 @@ func (c *Context) Execute() {
|
|||
c.writeReport()
|
||||
}
|
||||
|
||||
func (c *Context) executePoP() {
|
||||
// TODO: implement me
|
||||
}
|
||||
|
||||
func (c *Context) executePDP() {
|
||||
// TODO: implement me
|
||||
// TODO: replace logging with real algorithm
|
||||
log := c.log.With(zap.Int("nodes in container", c.cnrNodesNum))
|
||||
|
||||
for i := range c.pairs {
|
||||
log.Debug("next pair for hash game",
|
||||
zap.String("node 1", c.pairs[i].n1.Address()),
|
||||
zap.String("node 2", c.pairs[i].n2.Address()),
|
||||
zap.Stringer("object", c.pairs[i].id),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
161
pkg/services/audit/auditor/pop.go
Normal file
161
pkg/services/audit/auditor/pop.go
Normal file
|
@ -0,0 +1,161 @@
|
|||
package auditor
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (c *Context) executePoP() {
|
||||
c.buildCoverage()
|
||||
|
||||
c.report.SetPlacementCounters(
|
||||
c.counters.hit,
|
||||
c.counters.miss,
|
||||
c.counters.fail,
|
||||
)
|
||||
}
|
||||
|
||||
func (c *Context) buildCoverage() {
|
||||
replicas := c.task.ContainerStructure().PlacementPolicy().Replicas()
|
||||
|
||||
// select random member from another storage group
|
||||
// and process all placement vectors
|
||||
c.iterateSGMembersPlacementRand(func(id *object.ID, ind int, nodes netmap.Nodes) bool {
|
||||
c.processObjectPlacement(id, nodes, replicas[ind].Count())
|
||||
return c.containerCovered()
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Context) containerCovered() bool {
|
||||
// number of container nodes can be calculated once
|
||||
return c.cnrNodesNum <= len(c.pairedNodes)
|
||||
}
|
||||
|
||||
func (c *Context) processObjectPlacement(id *object.ID, nodes netmap.Nodes, replicas uint32) {
|
||||
var (
|
||||
ok uint32
|
||||
optimal bool
|
||||
|
||||
unpairedCandidate1, unpairedCandidate2 = -1, -1
|
||||
|
||||
pairedCandidate = -1
|
||||
)
|
||||
|
||||
for i := 0; !optimal && ok < replicas && i < len(nodes); i++ {
|
||||
// try to get object header from node
|
||||
_, err := c.cnrCom.GetHeader(c.task, nodes[i], id)
|
||||
if err != nil {
|
||||
c.log.Debug("could not get object header from candidate",
|
||||
zap.Stringer("id", id),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// increment success counter
|
||||
ok++
|
||||
|
||||
// update optimal flag
|
||||
optimal = ok == replicas && uint32(i) < replicas
|
||||
|
||||
// update potential candidates to be paired
|
||||
if _, ok := c.pairedNodes[nodes[i].Hash()]; !ok {
|
||||
if unpairedCandidate1 < 0 {
|
||||
unpairedCandidate1 = i
|
||||
} else if unpairedCandidate2 < 0 {
|
||||
unpairedCandidate2 = i
|
||||
}
|
||||
} else if pairedCandidate < 0 {
|
||||
pairedCandidate = i
|
||||
}
|
||||
}
|
||||
|
||||
if optimal {
|
||||
c.counters.hit++
|
||||
} else if ok == replicas {
|
||||
c.counters.miss++
|
||||
} else {
|
||||
c.counters.fail++
|
||||
}
|
||||
|
||||
if unpairedCandidate1 >= 0 {
|
||||
if unpairedCandidate2 >= 0 {
|
||||
c.composePair(id, nodes[unpairedCandidate1], nodes[unpairedCandidate2])
|
||||
} else if pairedCandidate >= 0 {
|
||||
c.composePair(id, nodes[unpairedCandidate1], nodes[pairedCandidate])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Context) composePair(id *object.ID, n1, n2 *netmap.Node) {
|
||||
c.pairs = append(c.pairs, gamePair{
|
||||
n1: n1,
|
||||
n2: n2,
|
||||
id: id,
|
||||
})
|
||||
|
||||
c.pairedNodes[n1.Hash()] = pairMemberInfo{
|
||||
node: n1,
|
||||
}
|
||||
c.pairedNodes[n2.Hash()] = pairMemberInfo{
|
||||
node: n2,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Context) iterateSGMembersPlacementRand(f func(*object.ID, int, netmap.Nodes) bool) {
|
||||
// iterate over storage groups members for all storage groups (one by one)
|
||||
// with randomly shuffled members
|
||||
c.iterateSGMembersRand(func(id *object.ID) bool {
|
||||
// build placement vector for the current object
|
||||
nn, err := c.buildPlacement(id)
|
||||
if err != nil {
|
||||
c.log.Debug("could not build placement for object",
|
||||
zap.Stringer("id", id),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
for i, nodes := range nn {
|
||||
if f(id, i, nodes) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Context) iterateSGMembersRand(f func(*object.ID) bool) {
|
||||
c.iterateSGInfo(func(members []*object.ID) bool {
|
||||
ln := len(members)
|
||||
|
||||
processed := make(map[uint64]struct{}, ln-1)
|
||||
|
||||
for len(processed) < ln {
|
||||
ind := nextRandUint64(uint64(ln), processed)
|
||||
processed[ind] = struct{}{}
|
||||
|
||||
if f(members[ind]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Context) iterateSGInfo(f func([]*object.ID) bool) {
|
||||
// we can add randomization like for SG members,
|
||||
// but list of storage groups is already expected
|
||||
// to be shuffled since it is a Search response
|
||||
// with unpredictable order
|
||||
for _, members := range c.sgMembersCache {
|
||||
if f(members) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,11 +34,7 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
|
|||
)
|
||||
|
||||
for i := range members {
|
||||
objectPlacement, err := placement.BuildObjectPlacement(
|
||||
c.task.NetworkMap(),
|
||||
c.task.ContainerNodes(),
|
||||
members[i],
|
||||
)
|
||||
objectPlacement, err := c.buildPlacement(members[i])
|
||||
if err != nil {
|
||||
c.log.Info("can't build placement for storage group member",
|
||||
zap.Stringer("sg", sg),
|
||||
|
@ -48,8 +44,6 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
|
|||
continue
|
||||
}
|
||||
|
||||
c.placementCache[members[i].String()] = objectPlacement
|
||||
|
||||
for _, node := range placement.FlattenNodes(objectPlacement) {
|
||||
hdr, err := c.cnrCom.GetHeader(c.task, node, members[i])
|
||||
if err != nil {
|
||||
|
|
26
pkg/services/audit/auditor/util.go
Normal file
26
pkg/services/audit/auditor/util.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
package auditor
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/rand"
|
||||
)
|
||||
|
||||
// returns random uint64 number [0; n) outside exclude map.
|
||||
// exclude must contain no more than n-1 elements [0; n)
|
||||
func nextRandUint64(n uint64, exclude map[uint64]struct{}) uint64 {
|
||||
ln := uint64(len(exclude))
|
||||
|
||||
ind := randUint64(n - ln)
|
||||
|
||||
for i := uint64(0); ; i++ {
|
||||
if i >= ind {
|
||||
if _, ok := exclude[i]; !ok {
|
||||
return i
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// returns random uint64 number [0, n).
|
||||
func randUint64(n uint64) uint64 {
|
||||
return rand.Uint64(rand.New(), int64(n))
|
||||
}
|
|
@ -47,3 +47,10 @@ func (r *Report) PassedPoR(sg *object.ID) {
|
|||
func (r *Report) FailedPoR(sg *object.ID) {
|
||||
r.res.SetFailSG(append(r.res.FailSG(), sg))
|
||||
}
|
||||
|
||||
// SetPlacementCounters sets counters of compliance with placement.
|
||||
func (r *Report) SetPlacementCounters(hit, miss, fail uint32) {
|
||||
r.res.SetHit(hit)
|
||||
r.res.SetMiss(miss)
|
||||
r.res.SetFail(fail)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue