forked from TrueCloudLab/frostfs-node
[#265] innerring: Select storage groups to audit
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
22cffbf529
commit
8dd7c689f2
7 changed files with 180 additions and 58 deletions
|
@ -192,6 +192,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
|||
// create audit processor
|
||||
auditProcessor, err := audit.New(&audit.Params{
|
||||
Log: log,
|
||||
NetmapContract: server.contracts.netmap,
|
||||
ContainerContract: server.contracts.container,
|
||||
AuditContract: server.contracts.audit,
|
||||
MorphClient: server.morphClient,
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
package invoke
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -57,47 +54,3 @@ func RemoveContainer(cli *client.Client, con util.Uint160, p *RemoveContainerPar
|
|||
p.Signature,
|
||||
)
|
||||
}
|
||||
|
||||
func ListContainers(cli *client.Client, con util.Uint160) ([]*container.ID, error) {
|
||||
if cli == nil {
|
||||
return nil, client.ErrNilClient
|
||||
}
|
||||
|
||||
item, err := cli.TestInvoke(con, listContainersMethod, []byte{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(item) < 1 {
|
||||
return nil, errors.Wrap(ErrParseTestInvoke, "nested array expected")
|
||||
}
|
||||
|
||||
rawIDs, err := client.ArrayFromStackItem(item[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*container.ID, 0, len(rawIDs))
|
||||
|
||||
var bufHash [sha256.Size]byte
|
||||
|
||||
for i := range rawIDs {
|
||||
cid, err := client.BytesFromStackItem(rawIDs[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(cid) != sha256.Size {
|
||||
return nil, errors.Wrap(ErrParseTestInvoke, "invalid container ID size")
|
||||
}
|
||||
|
||||
copy(bufHash[:], cid)
|
||||
|
||||
containerID := container.NewID()
|
||||
containerID.SetSHA256(bufHash)
|
||||
|
||||
result = append(result, containerID)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
44
pkg/innerring/invoke/enhanced.go
Normal file
44
pkg/innerring/invoke/enhanced.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package invoke
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
morphContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
|
||||
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||
morphNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
|
||||
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
)
|
||||
|
||||
const readOnlyFee = 0
|
||||
|
||||
// NewNoFeeContainerClient creates wrapper to access data from container contract.
|
||||
func NewNoFeeContainerClient(cli *client.Client, contract util.Uint160) (*wrapContainer.Wrapper, error) {
|
||||
staticClient, err := client.NewStatic(cli, contract, readOnlyFee)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create container static client: %w", err)
|
||||
}
|
||||
|
||||
enhancedContainerClient, err := morphContainer.New(staticClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create container morph client: %w", err)
|
||||
}
|
||||
|
||||
return wrapContainer.New(enhancedContainerClient)
|
||||
}
|
||||
|
||||
// NewNoFeeNetmapClient creates wrapper to access data from netmap contract.
|
||||
func NewNoFeeNetmapClient(cli *client.Client, contract util.Uint160) (*wrapNetmap.Wrapper, error) {
|
||||
staticClient, err := client.NewStatic(cli, contract, readOnlyFee)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create netmap static client: %w", err)
|
||||
}
|
||||
|
||||
enhancedNetmapClient, err := morphNetmap.New(staticClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create netmap morph client: %w", err)
|
||||
}
|
||||
|
||||
return wrapNetmap.New(enhancedNetmapClient)
|
||||
}
|
|
@ -1,23 +1,129 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var sgFilter = storagegroup.SearchQuery()
|
||||
|
||||
func (ap *Processor) processStartAudit(epoch uint64) {
|
||||
// todo: flush left audit results from audit result cache
|
||||
log := ap.log.With(zap.Uint64("epoch", epoch))
|
||||
|
||||
containers, err := ap.selectContainersToAudit(epoch)
|
||||
if err != nil {
|
||||
ap.log.Error("container selection failure",
|
||||
log.Error("container selection failure", zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("select containers for audit", zap.Int("amount", len(containers)))
|
||||
|
||||
nm, err := ap.netmapClient.GetNetMap(0)
|
||||
if err != nil {
|
||||
ap.log.Error("can't fetch network map",
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ap.log.Info("select containers for audit", zap.Int("amount", len(containers)))
|
||||
for i := range containers {
|
||||
cnr, err := ap.containerClient.Get(containers[i]) // get container structure
|
||||
if err != nil {
|
||||
log.Error("can't get container info, ignore",
|
||||
zap.Stringer("cid", containers[i]),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
// todo: for each container get list of storage groups
|
||||
// todo: for each container create audit result template in audit result cache
|
||||
// todo: for each container push audit tasks into queue
|
||||
continue
|
||||
}
|
||||
|
||||
// find all container nodes for current epoch
|
||||
nodes, err := nm.GetContainerNodes(cnr.PlacementPolicy(), nil)
|
||||
if err != nil {
|
||||
log.Info("can't build placement for container, ignore",
|
||||
zap.Stringer("cid", containers[i]),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// shuffle nodes to ask a random one
|
||||
n := nodes.Flatten()
|
||||
rand.Shuffle(len(n), func(i, j int) { // fixme: consider using crypto rand
|
||||
n[i], n[j] = n[j], n[i]
|
||||
})
|
||||
|
||||
// search storage groups
|
||||
storageGroups := ap.findStorageGroups(containers[i], n)
|
||||
log.Info("select storage groups for audit",
|
||||
zap.Stringer("cid", containers[i]),
|
||||
zap.Int("amount", len(storageGroups)))
|
||||
|
||||
// todo: for each container push audit tasks into queue
|
||||
}
|
||||
}
|
||||
|
||||
func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes) []*object.ID {
|
||||
var sg []*object.ID
|
||||
|
||||
ln := len(shuffled)
|
||||
|
||||
for i := range shuffled { // consider iterating over some part of container
|
||||
log := ap.log.With(
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("address", shuffled[0].Address()),
|
||||
zap.Int("try", i),
|
||||
zap.Int("total_tries", ln),
|
||||
)
|
||||
|
||||
addr, err := ipAddr(shuffled[i].Address())
|
||||
if err != nil {
|
||||
log.Warn("can't parse remote address", zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
cli, err := ap.clientCache.Get(addr)
|
||||
if err != nil {
|
||||
log.Warn("can't setup remote connection", zap.String("error", err.Error()))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
sgSearchParams := &client.SearchObjectParams{}
|
||||
sgSearchParams.WithContainerID(cid)
|
||||
sgSearchParams.WithSearchFilters(sgFilter)
|
||||
|
||||
// fixme: timeout from config
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
result, err := cli.SearchObject(ctx, sgSearchParams)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
log.Warn("error in storage group search", zap.String("error", err.Error()))
|
||||
continue
|
||||
}
|
||||
|
||||
sg = append(sg, result...)
|
||||
|
||||
break // we found storage groups, so break loop
|
||||
}
|
||||
|
||||
return sg
|
||||
}
|
||||
|
||||
func ipAddr(multiaddr string) (string, error) {
|
||||
address, err := network.AddressFromString(multiaddr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return address.IPAddrString()
|
||||
}
|
||||
|
|
|
@ -3,7 +3,10 @@ package audit
|
|||
import (
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -31,11 +34,15 @@ type (
|
|||
morphClient *client.Client
|
||||
irList Indexer
|
||||
clientCache NeoFSClientCache
|
||||
|
||||
containerClient *wrapContainer.Wrapper
|
||||
netmapClient *wrapNetmap.Wrapper
|
||||
}
|
||||
|
||||
// Params of the processor constructor.
|
||||
Params struct {
|
||||
Log *zap.Logger
|
||||
NetmapContract util.Uint160
|
||||
ContainerContract util.Uint160
|
||||
AuditContract util.Uint160
|
||||
MorphClient *client.Client
|
||||
|
@ -67,6 +74,18 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, errors.Wrap(err, "ir/audit: can't create worker pool")
|
||||
}
|
||||
|
||||
// creating enhanced client for getting network map
|
||||
netmapClient, err := invoke.NewNoFeeNetmapClient(p.MorphClient, p.NetmapContract)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// creating enhanced client for getting containers
|
||||
containerClient, err := invoke.NewNoFeeContainerClient(p.MorphClient, p.ContainerContract)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Processor{
|
||||
log: p.Log,
|
||||
pool: pool,
|
||||
|
@ -75,6 +94,8 @@ func New(p *Params) (*Processor, error) {
|
|||
morphClient: p.MorphClient,
|
||||
irList: p.IRList,
|
||||
clientCache: p.ClientCache,
|
||||
containerClient: containerClient,
|
||||
netmapClient: netmapClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -5,14 +5,13 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var ErrInvalidIRNode = errors.New("node is not in the inner ring list")
|
||||
|
||||
func (ap *Processor) selectContainersToAudit(epoch uint64) ([]*container.ID, error) {
|
||||
containers, err := invoke.ListContainers(ap.morphClient, ap.containerContract)
|
||||
containers, err := ap.containerClient.List(nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't get list of containers to start audit")
|
||||
}
|
||||
|
|
|
@ -34,9 +34,7 @@ func (l *ListValues) CIDList() [][]byte {
|
|||
func (c *Client) List(args ListArgs) (*ListValues, error) {
|
||||
invokeArgs := make([]interface{}, 0, 1)
|
||||
|
||||
if len(args.ownerID) > 0 {
|
||||
invokeArgs = append(invokeArgs, args.ownerID)
|
||||
}
|
||||
invokeArgs = append(invokeArgs, args.ownerID)
|
||||
|
||||
prms, err := c.client.TestInvoke(
|
||||
c.listMethod,
|
||||
|
|
Loading…
Reference in a new issue