forked from TrueCloudLab/frostfs-node
16f13bc0a5
To enable TLS support we can't operate with IP addresses directly. Certificates are issued with host names so it is required to pass them into RPC client. DNS resolving should be done by transport layer and not be a part of node. Therefore `IPAddrString` usage is removed from code. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
153 lines
3.9 KiB
Go
153 lines
3.9 KiB
Go
package audit
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
|
|
"github.com/nspcc-dev/neofs-node/pkg/util/rand"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var sgFilter = storagegroup.SearchQuery()
|
|
|
|
func (ap *Processor) processStartAudit(epoch uint64) {
|
|
log := ap.log.With(zap.Uint64("epoch", epoch))
|
|
|
|
ap.prevAuditCanceler()
|
|
|
|
skipped := ap.taskManager.Reset()
|
|
if skipped > 0 {
|
|
ap.log.Info("some tasks from previous epoch are skipped",
|
|
zap.Int("amount", skipped),
|
|
)
|
|
}
|
|
|
|
containers, err := ap.selectContainersToAudit(epoch)
|
|
if err != nil {
|
|
log.Error("container selection failure", zap.String("error", err.Error()))
|
|
|
|
return
|
|
}
|
|
|
|
log.Info("select containers for audit", zap.Int("amount", len(containers)))
|
|
|
|
nm, err := ap.netmapClient.GetNetMap(0)
|
|
if err != nil {
|
|
ap.log.Error("can't fetch network map",
|
|
zap.String("error", err.Error()))
|
|
|
|
return
|
|
}
|
|
|
|
var auditCtx context.Context
|
|
auditCtx, ap.prevAuditCanceler = context.WithCancel(context.Background())
|
|
|
|
for i := range containers {
|
|
cnr, err := ap.containerClient.Get(containers[i]) // get container structure
|
|
if err != nil {
|
|
log.Error("can't get container info, ignore",
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
pivot := containers[i].ToV2().GetValue()
|
|
|
|
// find all container nodes for current epoch
|
|
nodes, err := nm.GetContainerNodes(cnr.PlacementPolicy(), pivot)
|
|
if err != nil {
|
|
log.Info("can't build placement for container, ignore",
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
n := nodes.Flatten()
|
|
crand := rand.New() // math/rand with cryptographic source
|
|
|
|
// shuffle nodes to ask a random one
|
|
crand.Shuffle(len(n), func(i, j int) {
|
|
n[i], n[j] = n[j], n[i]
|
|
})
|
|
|
|
// search storage groups
|
|
storageGroups := ap.findStorageGroups(containers[i], n)
|
|
log.Info("select storage groups for audit",
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.Int("amount", len(storageGroups)))
|
|
|
|
auditTask := new(audit.Task).
|
|
WithReporter(&epochAuditReporter{
|
|
epoch: epoch,
|
|
rep: ap.reporter,
|
|
}).
|
|
WithAuditContext(auditCtx).
|
|
WithContainerID(containers[i]).
|
|
WithStorageGroupList(storageGroups).
|
|
WithContainerStructure(cnr).
|
|
WithContainerNodes(nodes).
|
|
WithNetworkMap(nm)
|
|
|
|
if err := ap.taskManager.PushTask(auditTask); err != nil {
|
|
ap.log.Error("could not push audit task",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes) []*object.ID {
|
|
var sg []*object.ID
|
|
|
|
ln := len(shuffled)
|
|
|
|
for i := range shuffled { // consider iterating over some part of container
|
|
log := ap.log.With(
|
|
zap.Stringer("cid", cid),
|
|
zap.String("address", shuffled[0].Address()),
|
|
zap.Int("try", i),
|
|
zap.Int("total_tries", ln),
|
|
)
|
|
|
|
addr, err := network.HostAddrFromMultiaddr(shuffled[i].Address())
|
|
if err != nil {
|
|
log.Warn("can't parse remote address", zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
cli, err := ap.clientCache.Get(addr)
|
|
if err != nil {
|
|
log.Warn("can't setup remote connection", zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
sgSearchParams := &client.SearchObjectParams{}
|
|
sgSearchParams.WithContainerID(cid)
|
|
sgSearchParams.WithSearchFilters(sgFilter)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
|
|
result, err := cli.SearchObject(ctx, sgSearchParams, client.WithKey(ap.key))
|
|
cancel()
|
|
|
|
if err != nil {
|
|
log.Warn("error in storage group search", zap.String("error", err.Error()))
|
|
continue
|
|
}
|
|
|
|
sg = append(sg, result...)
|
|
|
|
break // we found storage groups, so break loop
|
|
}
|
|
|
|
return sg
|
|
}
|