From 6370c2e160bb5d06aaaab7d0373ebcca631e5e6a Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 19 May 2022 21:26:27 +0300 Subject: [PATCH] [#1402] ir: Make `ClientCache` not depend on `audit.Task` That allows using `ClientCache` for storage group searching before task context is initialized. Also, that makes it more general purpose. Signed-off-by: Pavel Karpy --- pkg/innerring/rpc.go | 36 ++++++++++----------- pkg/services/audit/auditor/context.go | 45 +++++++++++++++++++++++---- pkg/services/audit/auditor/pdp.go | 10 +++++- pkg/services/audit/auditor/pop.go | 10 +++++- pkg/services/audit/auditor/por.go | 21 +++++++++++-- 5 files changed, 94 insertions(+), 28 deletions(-) diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 3e758af6..92bc9ae7 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -11,7 +11,7 @@ import ( neofsapiclient "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/client" auditproc "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/network/cache" - "github.com/nspcc-dev/neofs-node/pkg/services/audit" + "github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -58,16 +58,16 @@ func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) { return c.cache.Get(info) } -// GetSG polls the container from audit task to get the object by id. +// GetSG polls the container to get the object by id. // Returns storage groups structure from received object. // // Returns an error of type apistatus.ObjectNotFound if storage group is missing. -func (c *ClientCache) GetSG(task *audit.Task, id oid.ID) (*storagegroup.StorageGroup, error) { +func (c *ClientCache) GetSG(prm auditor.GetSGPrm) (*storagegroup.StorageGroup, error) { var sgAddress oid.Address - sgAddress.SetContainer(task.ContainerID()) - sgAddress.SetObject(id) + sgAddress.SetContainer(prm.CID) + sgAddress.SetObject(prm.OID) - return c.getSG(task.AuditContext(), sgAddress, task.NetworkMap(), task.ContainerNodes()) + return c.getSG(prm.Context, sgAddress, prm.NetMap, prm.Container) } func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.NetMap, cn [][]netmap.NodeInfo) (*storagegroup.StorageGroup, error) { @@ -129,14 +129,14 @@ func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.Ne } // GetHeader requests node from the container under audit to return object header by id. -func (c *ClientCache) GetHeader(task *audit.Task, node netmap.NodeInfo, id oid.ID, relay bool) (*object.Object, error) { +func (c *ClientCache) GetHeader(prm auditor.GetHeaderPrm) (*object.Object, error) { var objAddress oid.Address - objAddress.SetContainer(task.ContainerID()) - objAddress.SetObject(id) + objAddress.SetContainer(prm.CID) + objAddress.SetObject(prm.OID) var info clientcore.NodeInfo - err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(node)) + err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node)) if err != nil { return nil, fmt.Errorf("parse client node info: %w", err) } @@ -146,11 +146,11 @@ func (c *ClientCache) GetHeader(task *audit.Task, node netmap.NodeInfo, id oid.I return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } - cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) + cctx, cancel := context.WithTimeout(prm.Context, c.headTimeout) var obj *object.Object - if relay { + if prm.NodeIsRelay { obj, err = neofsapiclient.GetObjectHeaderFromContainer(cctx, cli, objAddress) } else { obj, err = neofsapiclient.GetRawObjectHeaderLocally(cctx, cli, objAddress) @@ -167,14 +167,14 @@ func (c *ClientCache) GetHeader(task *audit.Task, node netmap.NodeInfo, id oid.I // GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the // payload range of the object with specified identifier. -func (c *ClientCache) GetRangeHash(task *audit.Task, node netmap.NodeInfo, id oid.ID, rng *object.Range) ([]byte, error) { +func (c *ClientCache) GetRangeHash(prm auditor.GetRangeHashPrm) ([]byte, error) { var objAddress oid.Address - objAddress.SetContainer(task.ContainerID()) - objAddress.SetObject(id) + objAddress.SetContainer(prm.CID) + objAddress.SetObject(prm.OID) var info clientcore.NodeInfo - err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(node)) + err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node)) if err != nil { return nil, fmt.Errorf("parse client node info: %w", err) } @@ -184,9 +184,9 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node netmap.NodeInfo, id oi return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } - cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout) + cctx, cancel := context.WithTimeout(prm.Context, c.rangeTimeout) - h, err := neofsapiclient.HashObjectRange(cctx, cli, objAddress, rng) + h, err := neofsapiclient.HashObjectRange(cctx, cli, objAddress, prm.Range) cancel() diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index 5d296c01..87c54521 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -1,6 +1,7 @@ package auditor import ( + "context" "sync" "time" @@ -81,18 +82,50 @@ type ContextPrm struct { pdpWorkerPool, porWorkerPool util.WorkerPool } +type commonCommunicatorPrm struct { + Context context.Context + + OID oid.ID + CID cid.ID +} + +// GetSGPrm groups parameter of GetSG operation. +type GetSGPrm struct { + commonCommunicatorPrm + + NetMap *netmap.NetMap + Container [][]netmap.NodeInfo +} + +// GetHeaderPrm groups parameter of GetHeader operation. +type GetHeaderPrm struct { + commonCommunicatorPrm + + Node netmap.NodeInfo + + NodeIsRelay bool +} + +// GetRangeHashPrm groups parameter of GetRangeHash operation. +type GetRangeHashPrm struct { + commonCommunicatorPrm + + Node netmap.NodeInfo + Range *object.Range +} + // ContainerCommunicator is an interface of // component of communication with container nodes. type ContainerCommunicator interface { - // Must return storage group structure stored in object from container. - GetSG(*audit.Task, oid.ID) (*storagegroup.StorageGroup, error) + // GetSG must return storage group structure stored in object from container. + GetSG(GetSGPrm) (*storagegroup.StorageGroup, error) - // Must return object header from the container node. - GetHeader(*audit.Task, netmap.NodeInfo, oid.ID, bool) (*object.Object, error) + // GetHeader must return object header from the container node. + GetHeader(GetHeaderPrm) (*object.Object, error) - // Must return homomorphic Tillich-Zemor hash of payload range of the + // GetRangeHash must return homomorphic Tillich-Zemor hash of payload range of the // object stored in container node. - GetRangeHash(*audit.Task, netmap.NodeInfo, oid.ID, *object.Range) ([]byte, error) + GetRangeHash(GetRangeHashPrm) ([]byte, error) } // NewContext creates, initializes and returns Context. diff --git a/pkg/services/audit/auditor/pdp.go b/pkg/services/audit/auditor/pdp.go index afec36c2..d3d88b75 100644 --- a/pkg/services/audit/auditor/pdp.go +++ b/pkg/services/audit/auditor/pdp.go @@ -118,6 +118,12 @@ func (c *Context) collectHashes(p *gamePair) { } rand.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] }) + var getRangeHashPrm GetRangeHashPrm + getRangeHashPrm.Context = c.task.AuditContext() + getRangeHashPrm.CID = c.task.ContainerID() + getRangeHashPrm.OID = p.id + getRangeHashPrm.Node = n + res := make([][]byte, len(rngs)) for _, i := range order { var sleepDur time.Duration @@ -131,7 +137,9 @@ func (c *Context) collectHashes(p *gamePair) { time.Sleep(sleepDur) - h, err := c.cnrCom.GetRangeHash(c.task, n, p.id, rngs[i]) + getRangeHashPrm.Range = rngs[i] + + h, err := c.cnrCom.GetRangeHash(getRangeHashPrm) if err != nil { c.log.Debug("could not get payload range hash", zap.Stringer("id", p.id), diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go index bc7d3689..a5b18c07 100644 --- a/pkg/services/audit/auditor/pop.go +++ b/pkg/services/audit/auditor/pop.go @@ -48,9 +48,17 @@ func (c *Context) processObjectPlacement(id oid.ID, nodes []netmap.NodeInfo, rep pairedCandidate = -1 ) + var getHeaderPrm GetHeaderPrm + getHeaderPrm.Context = c.task.AuditContext() + getHeaderPrm.OID = id + getHeaderPrm.CID = c.task.ContainerID() + getHeaderPrm.NodeIsRelay = false + for i := 0; ok < replicas && i < len(nodes); i++ { + getHeaderPrm.Node = nodes[i] + // try to get object header from node - hdr, err := c.cnrCom.GetHeader(c.task, nodes[i], id, false) + hdr, err := c.cnrCom.GetHeader(getHeaderPrm) if err != nil { c.log.Debug("could not get object header from candidate", zap.Stringer("id", id), diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go index 7b9c1a8c..625cc4bb 100644 --- a/pkg/services/audit/auditor/por.go +++ b/pkg/services/audit/auditor/por.go @@ -36,7 +36,15 @@ func (c *Context) executePoR() { } func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { - storageGroup, err := c.cnrCom.GetSG(c.task, sg) // get storage group + var getSgPrm GetSGPrm + + getSgPrm.Context = c.task.AuditContext() + getSgPrm.CID = c.task.ContainerID() + getSgPrm.OID = sg + getSgPrm.NetMap = c.task.NetworkMap() + getSgPrm.Container = c.task.ContainerNodes() + + storageGroup, err := c.cnrCom.GetSG(getSgPrm) // get storage group if err != nil { c.log.Warn("can't get storage group", zap.Stringer("sgid", sg), @@ -55,6 +63,11 @@ func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { accRequests, accRetries uint32 ) + var getHeaderPrm GetHeaderPrm + getHeaderPrm.Context = c.task.AuditContext() + getHeaderPrm.CID = c.task.ContainerID() + getHeaderPrm.NodeIsRelay = true + for i := range members { objectPlacement, err := c.buildPlacement(members[i]) if err != nil { @@ -72,13 +85,17 @@ func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { flat[i], flat[j] = flat[j], flat[i] }) + getHeaderPrm.OID = members[i] + for j := range flat { accRequests++ if j > 0 { // in best case audit get object header on first iteration accRetries++ } - hdr, err := c.cnrCom.GetHeader(c.task, flat[j], members[i], true) + getHeaderPrm.Node = flat[j] + + hdr, err := c.cnrCom.GetHeader(getHeaderPrm) if err != nil { c.log.Debug("can't head object", zap.String("remote_node", hex.EncodeToString(flat[j].PublicKey())),