forked from TrueCloudLab/frostfs-node
[#271] service/audit: Implement PoR
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
5f65ec0265
commit
a5320408a5
4 changed files with 115 additions and 4 deletions
|
@ -90,6 +90,7 @@ func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.Stor
|
||||||
|
|
||||||
cctx, cancel := context.WithTimeout(task.AuditContext(), c.sgTimeout)
|
cctx, cancel := context.WithTimeout(task.AuditContext(), c.sgTimeout)
|
||||||
obj, err := cli.GetObject(cctx, getParams)
|
obj, err := cli.GetObject(cctx, getParams)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -135,6 +136,7 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.
|
||||||
|
|
||||||
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
|
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
|
||||||
head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(1))
|
head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(1))
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,6 +17,12 @@ type Context struct {
|
||||||
task *audit.Task
|
task *audit.Task
|
||||||
|
|
||||||
report *audit.Report
|
report *audit.Report
|
||||||
|
|
||||||
|
// consider adding mutex to access caches
|
||||||
|
|
||||||
|
sgMembersCache map[int][]*object.ID
|
||||||
|
|
||||||
|
placementCache map[string][]netmap.Nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContextPrm groups components required to conduct data audit checks.
|
// ContextPrm groups components required to conduct data audit checks.
|
||||||
|
@ -77,6 +83,10 @@ func (c *Context) containerID() *container.ID {
|
||||||
func (c *Context) init() {
|
func (c *Context) init() {
|
||||||
c.report = audit.NewReport(c.containerID())
|
c.report = audit.NewReport(c.containerID())
|
||||||
|
|
||||||
|
c.sgMembersCache = make(map[int][]*object.ID)
|
||||||
|
|
||||||
|
c.placementCache = make(map[string][]netmap.Nodes)
|
||||||
|
|
||||||
c.log = c.log.With(
|
c.log = c.log.With(
|
||||||
zap.Stringer("container ID", c.task.ContainerID()),
|
zap.Stringer("container ID", c.task.ContainerID()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -34,10 +34,6 @@ func (c *Context) Execute() {
|
||||||
c.writeReport()
|
c.writeReport()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Context) executePoR() {
|
|
||||||
// TODO: implement me
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Context) executePoP() {
|
func (c *Context) executePoP() {
|
||||||
// TODO: implement me
|
// TODO: implement me
|
||||||
}
|
}
|
||||||
|
|
103
pkg/services/audit/auditor/por.go
Normal file
103
pkg/services/audit/auditor/por.go
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package auditor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
"github.com/nspcc-dev/tzhash/tz"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Context) executePoR() {
|
||||||
|
for i, sg := range c.task.StorageGroupList() {
|
||||||
|
c.checkStorageGroupPoR(i, sg) // consider parallel it
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
|
||||||
|
storageGroup, err := c.cnrCom.GetSG(c.task, sg) // get storage group
|
||||||
|
if err != nil {
|
||||||
|
c.log.Warn("can't get storage group",
|
||||||
|
zap.Stringer("sgid", sg),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
members := storageGroup.Members()
|
||||||
|
c.sgMembersCache[ind] = members
|
||||||
|
|
||||||
|
var (
|
||||||
|
tzHash []byte
|
||||||
|
totalSize uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := range members {
|
||||||
|
objectPlacement, err := placement.BuildObjectPlacement(
|
||||||
|
c.task.NetworkMap(),
|
||||||
|
c.task.ContainerNodes(),
|
||||||
|
members[i],
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Info("can't build placement for storage group member",
|
||||||
|
zap.Stringer("sg", sg),
|
||||||
|
zap.Stringer("member_id", members[i]),
|
||||||
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.placementCache[members[i].String()] = objectPlacement
|
||||||
|
|
||||||
|
for _, node := range placement.FlattenNodes(objectPlacement) {
|
||||||
|
hdr, err := c.cnrCom.GetHeader(c.task, node, members[i])
|
||||||
|
if err != nil {
|
||||||
|
c.log.Debug("can't head object",
|
||||||
|
zap.String("remote_node", node.Address()),
|
||||||
|
zap.Stringer("oid", members[i]))
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tzHash) == 0 {
|
||||||
|
tzHash = hdr.PayloadHomomorphicHash().Sum()
|
||||||
|
} else {
|
||||||
|
tzHash, err = tz.Concat([][]byte{
|
||||||
|
tzHash,
|
||||||
|
hdr.PayloadHomomorphicHash().Sum(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
c.log.Debug("can't concatenate tz hash",
|
||||||
|
zap.Stringer("oid", members[i]),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
totalSize += hdr.PayloadSize()
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sizeCheck := storageGroup.ValidationDataSize() == totalSize
|
||||||
|
tzCheck := bytes.Equal(tzHash, storageGroup.ValidationDataHash().Sum())
|
||||||
|
|
||||||
|
if sizeCheck && tzCheck {
|
||||||
|
c.report.PassedPoR(sg) // write report
|
||||||
|
} else {
|
||||||
|
if !sizeCheck {
|
||||||
|
c.log.Debug("storage group size check failed",
|
||||||
|
zap.Uint64("expected", storageGroup.ValidationDataSize()),
|
||||||
|
zap.Uint64("got", totalSize))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !tzCheck {
|
||||||
|
c.log.Debug("storage group tz hash check failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
c.report.FailedPoR(sg) // write report
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue