From 782bcadd9283e81eee19b7764079f25b1e8f6d24 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 23 Dec 2020 11:44:29 +0300 Subject: [PATCH] [#271] innerring: Implement `GetSG` and `Head` of audit communicator Signed-off-by: Alex Vanin --- cmd/neofs-ir/defaults.go | 3 + pkg/innerring/innerring.go | 8 ++- pkg/innerring/rpc.go | 124 ++++++++++++++++++++++++++++++++++--- 3 files changed, 125 insertions(+), 10 deletions(-) diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 96e5b035e..780080d2c 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -93,4 +93,7 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("audit.task.exec_pool_size", 10) cfg.SetDefault("audit.task.queue_capacity", 100) + cfg.SetDefault("audit.timeout.get", "5s") + cfg.SetDefault("audit.timeout.head", "5s") + cfg.SetDefault("audit.timeout.rangehash", "5s") } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 1e708d0b8..a6ff34953 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -216,7 +216,13 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } - clientCache := newClientCache(server.key) + clientCache := newClientCache(&clientCacheParams{ + Log: log, + Key: server.key, + SGTimeout: cfg.GetDuration("audit.timeout.get"), + HeadTimeout: cfg.GetDuration("audit.timeout.head"), + RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), + }) auditTaskManager := audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 834bcc3e5..ec730fd5c 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -1,25 +1,48 @@ package innerring import ( + "context" "crypto/ecdsa" + "fmt" + "time" "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" + coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/audit" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "go.uber.org/zap" ) -type ClientCache struct { - cache *cache.ClientCache - key *ecdsa.PrivateKey -} +type ( + ClientCache struct { + log *zap.Logger + cache *cache.ClientCache + key *ecdsa.PrivateKey -func newClientCache(key *ecdsa.PrivateKey) *ClientCache { + sgTimeout, headTimeout, rangeTimeout time.Duration + } + + clientCacheParams struct { + Log *zap.Logger + Key *ecdsa.PrivateKey + + SGTimeout, HeadTimeout, RangeTimeout time.Duration + } +) + +func newClientCache(p *clientCacheParams) *ClientCache { return &ClientCache{ - cache: cache.NewSDKClientCache(), - key: key, + log: p.Log, + cache: cache.NewSDKClientCache(), + key: p.Key, + sgTimeout: p.SGTimeout, + headTimeout: p.HeadTimeout, + rangeTimeout: p.RangeTimeout, } } @@ -30,12 +53,95 @@ func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client // GetSG polls the container from audit task to get the object by id. // Returns storage groups structure from received object. func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.StorageGroup, error) { - panic("implement me") + nodes, err := placement.BuildObjectPlacement( // shuffle nodes + task.NetworkMap(), + task.ContainerNodes(), + id, + ) + if err != nil { + return nil, fmt.Errorf("can't build object placement: %w", err) + } + + sgAddress := new(object.Address) + sgAddress.SetContainerID(task.ContainerID()) + sgAddress.SetObjectID(id) + + getParams := new(client.GetObjectParams) + getParams.WithAddress(sgAddress) + + for _, node := range placement.FlattenNodes(nodes) { + addr, err := network.IPAddrFromMultiaddr(node.Address()) + if err != nil { + c.log.Warn("can't parse remote address", + zap.String("address", node.Address()), + zap.String("error", err.Error())) + + continue + } + + cli, err := c.Get(addr) + if err != nil { + c.log.Warn("can't setup remote connection", + zap.String("address", addr), + zap.String("error", err.Error())) + + continue + } + + cctx, cancel := context.WithTimeout(task.AuditContext(), c.sgTimeout) + obj, err := cli.GetObject(cctx, getParams) + cancel() + + if err != nil { + c.log.Warn("can't get storage group object", + zap.String("error", err.Error())) + + continue + } + + sg := storagegroup.New() + + err = sg.Unmarshal(obj.Payload()) + if err != nil { + return nil, fmt.Errorf("can't parse storage group payload: %w", err) + } + + return sg, nil + } + + return nil, coreObject.ErrNotFound } // GetHeader requests node from the container under audit to return object header by id. func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.ID) (*object.Object, error) { - panic("implement me") + objAddress := new(object.Address) + objAddress.SetContainerID(task.ContainerID()) + objAddress.SetObjectID(id) + + headParams := new(client.ObjectHeaderParams) + headParams.WithRawFlag(true) + headParams.WithMainFields() + headParams.WithAddress(objAddress) + + addr, err := network.IPAddrFromMultiaddr(node.Address()) + if err != nil { + return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err) + } + + cli, err := c.Get(addr) + if err != nil { + return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err) + } + + cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) + head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(1)) + cancel() + + if err != nil { + return nil, fmt.Errorf("object head error: %w", err) + } + + return head, nil } // GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the