From 8dd7c689f2bb4c1e13341073c9b458e6aa13ecbe Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 21 Dec 2020 11:40:30 +0300 Subject: [PATCH] [#265] innerring: Select storage groups to audit Signed-off-by: Alex Vanin --- pkg/innerring/innerring.go | 1 + pkg/innerring/invoke/container.go | 47 -------- pkg/innerring/invoke/enhanced.go | 44 ++++++++ pkg/innerring/processors/audit/process.go | 118 +++++++++++++++++++- pkg/innerring/processors/audit/processor.go | 21 ++++ pkg/innerring/processors/audit/scheduler.go | 3 +- pkg/morph/client/container/list.go | 4 +- 7 files changed, 180 insertions(+), 58 deletions(-) create mode 100644 pkg/innerring/invoke/enhanced.go diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 7c3ebb787..703052195 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -192,6 +192,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error // create audit processor auditProcessor, err := audit.New(&audit.Params{ Log: log, + NetmapContract: server.contracts.netmap, ContainerContract: server.contracts.container, AuditContract: server.contracts.audit, MorphClient: server.morphClient, diff --git a/pkg/innerring/invoke/container.go b/pkg/innerring/invoke/container.go index 770c8698d..1d4aeb36f 100644 --- a/pkg/innerring/invoke/container.go +++ b/pkg/innerring/invoke/container.go @@ -1,11 +1,8 @@ package invoke import ( - "crypto/sha256" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/pkg/errors" ) @@ -57,47 +54,3 @@ func RemoveContainer(cli *client.Client, con util.Uint160, p *RemoveContainerPar p.Signature, ) } - -func ListContainers(cli *client.Client, con util.Uint160) ([]*container.ID, error) { - if cli == nil { - return nil, client.ErrNilClient - } - - item, err := cli.TestInvoke(con, listContainersMethod, []byte{}) - if err != nil { - return nil, err - } - - if len(item) < 1 { - return nil, errors.Wrap(ErrParseTestInvoke, "nested array expected") - } - - rawIDs, err := client.ArrayFromStackItem(item[0]) - if err != nil { - return nil, err - } - - result := make([]*container.ID, 0, len(rawIDs)) - - var bufHash [sha256.Size]byte - - for i := range rawIDs { - cid, err := client.BytesFromStackItem(rawIDs[i]) - if err != nil { - return nil, err - } - - if len(cid) != sha256.Size { - return nil, errors.Wrap(ErrParseTestInvoke, "invalid container ID size") - } - - copy(bufHash[:], cid) - - containerID := container.NewID() - containerID.SetSHA256(bufHash) - - result = append(result, containerID) - } - - return result, nil -} diff --git a/pkg/innerring/invoke/enhanced.go b/pkg/innerring/invoke/enhanced.go new file mode 100644 index 000000000..8c3c63f85 --- /dev/null +++ b/pkg/innerring/invoke/enhanced.go @@ -0,0 +1,44 @@ +package invoke + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + morphContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" + wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" + morphNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" + wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" +) + +const readOnlyFee = 0 + +// NewNoFeeContainerClient creates wrapper to access data from container contract. +func NewNoFeeContainerClient(cli *client.Client, contract util.Uint160) (*wrapContainer.Wrapper, error) { + staticClient, err := client.NewStatic(cli, contract, readOnlyFee) + if err != nil { + return nil, fmt.Errorf("can't create container static client: %w", err) + } + + enhancedContainerClient, err := morphContainer.New(staticClient) + if err != nil { + return nil, fmt.Errorf("can't create container morph client: %w", err) + } + + return wrapContainer.New(enhancedContainerClient) +} + +// NewNoFeeNetmapClient creates wrapper to access data from netmap contract. +func NewNoFeeNetmapClient(cli *client.Client, contract util.Uint160) (*wrapNetmap.Wrapper, error) { + staticClient, err := client.NewStatic(cli, contract, readOnlyFee) + if err != nil { + return nil, fmt.Errorf("can't create netmap static client: %w", err) + } + + enhancedNetmapClient, err := morphNetmap.New(staticClient) + if err != nil { + return nil, fmt.Errorf("can't create netmap morph client: %w", err) + } + + return wrapNetmap.New(enhancedNetmapClient) +} diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index b2f1e5c42..36b2012cb 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -1,23 +1,129 @@ package audit import ( + "context" + "math/rand" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup" "go.uber.org/zap" ) +var sgFilter = storagegroup.SearchQuery() + func (ap *Processor) processStartAudit(epoch uint64) { - // todo: flush left audit results from audit result cache + log := ap.log.With(zap.Uint64("epoch", epoch)) containers, err := ap.selectContainersToAudit(epoch) if err != nil { - ap.log.Error("container selection failure", + log.Error("container selection failure", zap.String("error", err.Error())) + + return + } + + log.Info("select containers for audit", zap.Int("amount", len(containers))) + + nm, err := ap.netmapClient.GetNetMap(0) + if err != nil { + ap.log.Error("can't fetch network map", zap.String("error", err.Error())) return } - ap.log.Info("select containers for audit", zap.Int("amount", len(containers))) + for i := range containers { + cnr, err := ap.containerClient.Get(containers[i]) // get container structure + if err != nil { + log.Error("can't get container info, ignore", + zap.Stringer("cid", containers[i]), + zap.String("error", err.Error())) - // todo: for each container get list of storage groups - // todo: for each container create audit result template in audit result cache - // todo: for each container push audit tasks into queue + continue + } + + // find all container nodes for current epoch + nodes, err := nm.GetContainerNodes(cnr.PlacementPolicy(), nil) + if err != nil { + log.Info("can't build placement for container, ignore", + zap.Stringer("cid", containers[i]), + zap.String("error", err.Error())) + + continue + } + + // shuffle nodes to ask a random one + n := nodes.Flatten() + rand.Shuffle(len(n), func(i, j int) { // fixme: consider using crypto rand + n[i], n[j] = n[j], n[i] + }) + + // search storage groups + storageGroups := ap.findStorageGroups(containers[i], n) + log.Info("select storage groups for audit", + zap.Stringer("cid", containers[i]), + zap.Int("amount", len(storageGroups))) + + // todo: for each container push audit tasks into queue + } +} + +func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes) []*object.ID { + var sg []*object.ID + + ln := len(shuffled) + + for i := range shuffled { // consider iterating over some part of container + log := ap.log.With( + zap.Stringer("cid", cid), + zap.String("address", shuffled[0].Address()), + zap.Int("try", i), + zap.Int("total_tries", ln), + ) + + addr, err := ipAddr(shuffled[i].Address()) + if err != nil { + log.Warn("can't parse remote address", zap.String("error", err.Error())) + } + + cli, err := ap.clientCache.Get(addr) + if err != nil { + log.Warn("can't setup remote connection", zap.String("error", err.Error())) + + continue + } + + sgSearchParams := &client.SearchObjectParams{} + sgSearchParams.WithContainerID(cid) + sgSearchParams.WithSearchFilters(sgFilter) + + // fixme: timeout from config + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + result, err := cli.SearchObject(ctx, sgSearchParams) + cancel() + + if err != nil { + log.Warn("error in storage group search", zap.String("error", err.Error())) + continue + } + + sg = append(sg, result...) + + break // we found storage groups, so break loop + } + + return sg +} + +func ipAddr(multiaddr string) (string, error) { + address, err := network.AddressFromString(multiaddr) + if err != nil { + return "", err + } + + return address.IPAddrString() } diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index d018d08f0..949402331 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -3,7 +3,10 @@ package audit import ( "github.com/nspcc-dev/neo-go/pkg/util" SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" "github.com/nspcc-dev/neofs-node/pkg/morph/client" + wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" + wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/panjf2000/ants/v2" "github.com/pkg/errors" @@ -31,11 +34,15 @@ type ( morphClient *client.Client irList Indexer clientCache NeoFSClientCache + + containerClient *wrapContainer.Wrapper + netmapClient *wrapNetmap.Wrapper } // Params of the processor constructor. Params struct { Log *zap.Logger + NetmapContract util.Uint160 ContainerContract util.Uint160 AuditContract util.Uint160 MorphClient *client.Client @@ -67,6 +74,18 @@ func New(p *Params) (*Processor, error) { return nil, errors.Wrap(err, "ir/audit: can't create worker pool") } + // creating enhanced client for getting network map + netmapClient, err := invoke.NewNoFeeNetmapClient(p.MorphClient, p.NetmapContract) + if err != nil { + return nil, err + } + + // creating enhanced client for getting containers + containerClient, err := invoke.NewNoFeeContainerClient(p.MorphClient, p.ContainerContract) + if err != nil { + return nil, err + } + return &Processor{ log: p.Log, pool: pool, @@ -75,6 +94,8 @@ func New(p *Params) (*Processor, error) { morphClient: p.MorphClient, irList: p.IRList, clientCache: p.ClientCache, + containerClient: containerClient, + netmapClient: netmapClient, }, nil } diff --git a/pkg/innerring/processors/audit/scheduler.go b/pkg/innerring/processors/audit/scheduler.go index 20a0fa05c..701308bf2 100644 --- a/pkg/innerring/processors/audit/scheduler.go +++ b/pkg/innerring/processors/audit/scheduler.go @@ -5,14 +5,13 @@ import ( "strings" "github.com/nspcc-dev/neofs-api-go/pkg/container" - "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" "github.com/pkg/errors" ) var ErrInvalidIRNode = errors.New("node is not in the inner ring list") func (ap *Processor) selectContainersToAudit(epoch uint64) ([]*container.ID, error) { - containers, err := invoke.ListContainers(ap.morphClient, ap.containerContract) + containers, err := ap.containerClient.List(nil) if err != nil { return nil, errors.Wrap(err, "can't get list of containers to start audit") } diff --git a/pkg/morph/client/container/list.go b/pkg/morph/client/container/list.go index bc3a9ae2a..ce1e1e0f6 100644 --- a/pkg/morph/client/container/list.go +++ b/pkg/morph/client/container/list.go @@ -34,9 +34,7 @@ func (l *ListValues) CIDList() [][]byte { func (c *Client) List(args ListArgs) (*ListValues, error) { invokeArgs := make([]interface{}, 0, 1) - if len(args.ownerID) > 0 { - invokeArgs = append(invokeArgs, args.ownerID) - } + invokeArgs = append(invokeArgs, args.ownerID) prms, err := c.client.TestInvoke( c.listMethod,