From a5320408a5dc80e7ebd15757287789ac3d17ddfd Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 23 Dec 2020 12:54:34 +0300 Subject: [PATCH] [#271] service/audit: Implement PoR Signed-off-by: Alex Vanin --- pkg/innerring/rpc.go | 2 + pkg/services/audit/auditor/context.go | 10 +++ pkg/services/audit/auditor/exec.go | 4 - pkg/services/audit/auditor/por.go | 103 ++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 pkg/services/audit/auditor/por.go diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index ec730fd5c..42b342629 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -90,6 +90,7 @@ func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.Stor cctx, cancel := context.WithTimeout(task.AuditContext(), c.sgTimeout) obj, err := cli.GetObject(cctx, getParams) + cancel() if err != nil { @@ -135,6 +136,7 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(1)) + cancel() if err != nil { diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index 8b9bd53f3..de49e82ea 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -17,6 +17,12 @@ type Context struct { task *audit.Task report *audit.Report + + // consider adding mutex to access caches + + sgMembersCache map[int][]*object.ID + + placementCache map[string][]netmap.Nodes } // ContextPrm groups components required to conduct data audit checks. @@ -77,6 +83,10 @@ func (c *Context) containerID() *container.ID { func (c *Context) init() { c.report = audit.NewReport(c.containerID()) + c.sgMembersCache = make(map[int][]*object.ID) + + c.placementCache = make(map[string][]netmap.Nodes) + c.log = c.log.With( zap.Stringer("container ID", c.task.ContainerID()), ) diff --git a/pkg/services/audit/auditor/exec.go b/pkg/services/audit/auditor/exec.go index 5ed605b74..81c1cdbb2 100644 --- a/pkg/services/audit/auditor/exec.go +++ b/pkg/services/audit/auditor/exec.go @@ -34,10 +34,6 @@ func (c *Context) Execute() { c.writeReport() } -func (c *Context) executePoR() { - // TODO: implement me -} - func (c *Context) executePoP() { // TODO: implement me } diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go new file mode 100644 index 000000000..f2a703616 --- /dev/null +++ b/pkg/services/audit/auditor/por.go @@ -0,0 +1,103 @@ +package auditor + +import ( + "bytes" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/tzhash/tz" + "go.uber.org/zap" +) + +func (c *Context) executePoR() { + for i, sg := range c.task.StorageGroupList() { + c.checkStorageGroupPoR(i, sg) // consider parallel it + } +} + +func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) { + storageGroup, err := c.cnrCom.GetSG(c.task, sg) // get storage group + if err != nil { + c.log.Warn("can't get storage group", + zap.Stringer("sgid", sg), + zap.String("error", err.Error())) + + return + } + + members := storageGroup.Members() + c.sgMembersCache[ind] = members + + var ( + tzHash []byte + totalSize uint64 + ) + + for i := range members { + objectPlacement, err := placement.BuildObjectPlacement( + c.task.NetworkMap(), + c.task.ContainerNodes(), + members[i], + ) + if err != nil { + c.log.Info("can't build placement for storage group member", + zap.Stringer("sg", sg), + zap.Stringer("member_id", members[i]), + ) + + continue + } + + c.placementCache[members[i].String()] = objectPlacement + + for _, node := range placement.FlattenNodes(objectPlacement) { + hdr, err := c.cnrCom.GetHeader(c.task, node, members[i]) + if err != nil { + c.log.Debug("can't head object", + zap.String("remote_node", node.Address()), + zap.Stringer("oid", members[i])) + + continue + } + + if len(tzHash) == 0 { + tzHash = hdr.PayloadHomomorphicHash().Sum() + } else { + tzHash, err = tz.Concat([][]byte{ + tzHash, + hdr.PayloadHomomorphicHash().Sum(), + }) + if err != nil { + c.log.Debug("can't concatenate tz hash", + zap.Stringer("oid", members[i]), + zap.String("error", err.Error())) + + break + } + } + + totalSize += hdr.PayloadSize() + + break + } + } + + sizeCheck := storageGroup.ValidationDataSize() == totalSize + tzCheck := bytes.Equal(tzHash, storageGroup.ValidationDataHash().Sum()) + + if sizeCheck && tzCheck { + c.report.PassedPoR(sg) // write report + } else { + if !sizeCheck { + c.log.Debug("storage group size check failed", + zap.Uint64("expected", storageGroup.ValidationDataSize()), + zap.Uint64("got", totalSize)) + } + + if !tzCheck { + c.log.Debug("storage group tz hash check failed") + } + + c.report.FailedPoR(sg) // write report + } +}