From 03e3afb0e884d5fd4205c0251071b1a93db7b2a6 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 22 Dec 2020 15:14:50 +0300 Subject: [PATCH] [#255] services/audit: Define interface of container communicator Define interface of the container communicator which methods are going to be used in audit checks. Make innerring Server to implement this interface. Signed-off-by: Leonard Lyubich --- pkg/innerring/innerring.go | 5 ++++- pkg/innerring/rpc.go | 21 ++++++++++++++++++ pkg/services/audit/auditor/context.go | 26 +++++++++++++++++++++++ pkg/services/audit/taskmanager/manager.go | 8 +++++++ 4 files changed, 59 insertions(+), 1 deletion(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 3c3499697..1e708d0b8 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -216,10 +216,13 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + clientCache := newClientCache(server.key) + auditTaskManager := audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), audittask.WithLogger(log), + audittask.WithContainerCommunicator(clientCache), ) server.workers = append(server.workers, auditTaskManager.Listen) @@ -232,7 +235,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error AuditContract: server.contracts.audit, MorphClient: server.morphClient, IRList: server, - ClientCache: newClientCache(server.key), + ClientCache: clientCache, TaskManager: auditTaskManager, Reporter: server, }) diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 5c12ae5b5..834bcc3e5 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -4,7 +4,11 @@ import ( "crypto/ecdsa" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" "github.com/nspcc-dev/neofs-node/pkg/network/cache" + "github.com/nspcc-dev/neofs-node/pkg/services/audit" ) type ClientCache struct { @@ -22,3 +26,20 @@ func newClientCache(key *ecdsa.PrivateKey) *ClientCache { func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { return c.cache.Get(c.key, address, opts...) } + +// GetSG polls the container from audit task to get the object by id. +// Returns storage groups structure from received object. +func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.StorageGroup, error) { + panic("implement me") +} + +// GetHeader requests node from the container under audit to return object header by id. +func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.ID) (*object.Object, error) { + panic("implement me") +} + +// GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the +// payload range of the object with specified identifier. +func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *object.ID, rng *object.Range) ([]byte, error) { + panic("implement me") +} diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index 52b56e6e4..8b9bd53f3 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -2,6 +2,9 @@ package auditor import ( "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" @@ -19,6 +22,22 @@ type Context struct { // ContextPrm groups components required to conduct data audit checks. type ContextPrm struct { log *logger.Logger + + cnrCom ContainerCommunicator +} + +// ContainerCommunicator is an interface of +// component of communication with container nodes. +type ContainerCommunicator interface { + // Must return storage group structure stored in object from container. + GetSG(*audit.Task, *object.ID) (*storagegroup.StorageGroup, error) + + // Must return object header from the container node. + GetHeader(*audit.Task, *netmap.Node, *object.ID) (*object.Object, error) + + // Must return homomorphic Tillich-Zemor hash of payload range of the + // object stored in container node. + GetRangeHash(*audit.Task, *netmap.Node, *object.ID, *object.Range) ([]byte, error) } // NewContext creates, initializes and returns Context. @@ -35,6 +54,13 @@ func (p *ContextPrm) SetLogger(l *logger.Logger) { } } +// SetContainerCommunicator sets component of communication with container nodes. +func (p *ContextPrm) SetContainerCommunicator(cnrCom ContainerCommunicator) { + if p != nil { + p.cnrCom = cnrCom + } +} + // WithTask sets container audit parameters. func (c *Context) WithTask(t *audit.Task) *Context { if c != nil { diff --git a/pkg/services/audit/taskmanager/manager.go b/pkg/services/audit/taskmanager/manager.go index 85b707951..174282744 100644 --- a/pkg/services/audit/taskmanager/manager.go +++ b/pkg/services/audit/taskmanager/manager.go @@ -71,3 +71,11 @@ func WithQueueCapacity(cap uint32) Option { c.queueCap = cap } } + +// WithContainerCommunicator returns option to set component of communication +// with container nodes. +func WithContainerCommunicator(cnrCom auditor.ContainerCommunicator) Option { + return func(c *cfg) { + c.ctxPrm.SetContainerCommunicator(cnrCom) + } +}