Evgenii Stratonikov
0e31c12e63
Drop duplicate entities. Format entities. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com> Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
217 lines
5.6 KiB
Go
217 lines
5.6 KiB
Go
package audit
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup"
|
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func (ap *Processor) processStartAudit(epoch uint64) {
|
|
log := ap.log.With(zap.Uint64("epoch", epoch))
|
|
|
|
ap.prevAuditCanceler()
|
|
|
|
skipped := ap.taskManager.Reset()
|
|
if skipped > 0 {
|
|
ap.log.Info(logs.AuditSomeTasksFromPreviousEpochAreSkipped,
|
|
zap.Int("amount", skipped),
|
|
)
|
|
}
|
|
|
|
containers, err := ap.selectContainersToAudit(epoch)
|
|
if err != nil {
|
|
log.Error(logs.AuditContainerSelectionFailure, zap.String("error", err.Error()))
|
|
|
|
return
|
|
}
|
|
|
|
log.Info(logs.AuditSelectContainersForAudit, zap.Int("amount", len(containers)))
|
|
|
|
nm, err := ap.netmapClient.GetNetMap(0)
|
|
if err != nil {
|
|
ap.log.Error(logs.AuditCantFetchNetworkMap,
|
|
zap.String("error", err.Error()))
|
|
|
|
return
|
|
}
|
|
|
|
cancelChannel := make(chan struct{})
|
|
ap.prevAuditCanceler = func() {
|
|
select {
|
|
case <-cancelChannel: // already closed
|
|
default:
|
|
close(cancelChannel)
|
|
}
|
|
}
|
|
|
|
pivot := make([]byte, sha256.Size)
|
|
|
|
ap.startAuditTasksOnContainers(cancelChannel, containers, log, pivot, nm, epoch)
|
|
}
|
|
|
|
func (ap *Processor) startAuditTasksOnContainers(cancelChannel <-chan struct{}, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) {
|
|
for i := range containers {
|
|
cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure
|
|
if err != nil {
|
|
log.Error(logs.AuditCantGetContainerInfoIgnore,
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
containers[i].Encode(pivot)
|
|
|
|
// find all container nodes for current epoch
|
|
nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), pivot)
|
|
if err != nil {
|
|
log.Info(logs.AuditCantBuildPlacementForContainerIgnore,
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
n := placement.FlattenNodes(nodes)
|
|
|
|
// shuffle nodes to ask a random one
|
|
rand.Shuffle(len(n), func(i, j int) {
|
|
n[i], n[j] = n[j], n[i]
|
|
})
|
|
|
|
// search storage groups
|
|
storageGroupsIDs := ap.findStorageGroups(containers[i], n)
|
|
log.Info(logs.AuditSelectStorageGroupsForAudit,
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.Int("amount", len(storageGroupsIDs)))
|
|
|
|
// filter expired storage groups
|
|
storageGroups := ap.filterExpiredSG(containers[i], storageGroupsIDs, nodes, *nm)
|
|
log.Info(logs.AuditFilterExpiredStorageGroupsForAudit,
|
|
zap.Stringer("cid", containers[i]),
|
|
zap.Int("amount", len(storageGroups)))
|
|
|
|
// skip audit for containers without
|
|
// non-expired storage groups
|
|
if len(storageGroupsIDs) == 0 {
|
|
continue
|
|
}
|
|
|
|
auditTask := new(audit.Task).
|
|
WithReporter(&epochAuditReporter{
|
|
epoch: epoch,
|
|
rep: ap.reporter,
|
|
}).
|
|
WithCancelChannel(cancelChannel).
|
|
WithContainerID(containers[i]).
|
|
WithStorageGroupList(storageGroups).
|
|
WithContainerStructure(cnr.Value).
|
|
WithContainerNodes(nodes).
|
|
WithNetworkMap(nm)
|
|
|
|
ap.taskManager.PushTask(auditTask)
|
|
}
|
|
}
|
|
|
|
func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []oid.ID {
|
|
var sg []oid.ID
|
|
|
|
ln := len(shuffled)
|
|
|
|
var (
|
|
info clientcore.NodeInfo
|
|
prm storagegroup.SearchSGPrm
|
|
)
|
|
|
|
prm.Container = cnr
|
|
|
|
for i := range shuffled { // consider iterating over some part of container
|
|
log := ap.log.With(
|
|
zap.Stringer("cid", cnr),
|
|
zap.String("key", netmap.StringifyPublicKey(shuffled[0])),
|
|
zap.Int("try", i),
|
|
zap.Int("total_tries", ln),
|
|
)
|
|
|
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(shuffled[i]))
|
|
if err != nil {
|
|
log.Warn(logs.AuditParseClientNodeInfo, zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
|
|
|
|
prm.NodeInfo = info
|
|
|
|
var dst storagegroup.SearchSGDst
|
|
|
|
err = ap.sgSrc.ListSG(ctx, &dst, prm)
|
|
|
|
cancel()
|
|
|
|
if err != nil {
|
|
log.Warn(logs.AuditErrorInStorageGroupSearch, zap.String("error", err.Error()))
|
|
continue
|
|
}
|
|
|
|
sg = append(sg, dst.Objects...)
|
|
|
|
break // we found storage groups, so break loop
|
|
}
|
|
|
|
return sg
|
|
}
|
|
|
|
func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID,
|
|
cnr [][]netmap.NodeInfo, nm netmap.NetMap) []storagegroup.StorageGroup {
|
|
sgs := make([]storagegroup.StorageGroup, 0, len(sgIDs))
|
|
var coreSG storagegroup.StorageGroup
|
|
|
|
var getSGPrm storagegroup.GetSGPrm
|
|
getSGPrm.CID = cid
|
|
getSGPrm.Container = cnr
|
|
getSGPrm.NetMap = nm
|
|
|
|
for _, sgID := range sgIDs {
|
|
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
|
|
|
|
getSGPrm.OID = sgID
|
|
|
|
sg, err := ap.sgSrc.GetSG(ctx, getSGPrm)
|
|
|
|
cancel()
|
|
|
|
if err != nil {
|
|
ap.log.Error(
|
|
"could not get storage group object for audit, skipping",
|
|
zap.Stringer("cid", cid),
|
|
zap.Stringer("oid", sgID),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
|
|
// filter expired epochs
|
|
if sg.ExpirationEpoch() >= ap.epochSrc.EpochCounter() {
|
|
coreSG.SetID(sgID)
|
|
coreSG.SetStorageGroup(*sg)
|
|
|
|
sgs = append(sgs, coreSG)
|
|
}
|
|
}
|
|
|
|
return sgs
|
|
}
|