package audit import ( "context" "crypto/sha256" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) func (ap *Processor) processStartAudit(epoch uint64) { log := ap.log.With(zap.Uint64("epoch", epoch)) ap.prevAuditCanceler() skipped := ap.taskManager.Reset() if skipped > 0 { ap.log.Info("some tasks from previous epoch are skipped", zap.Int("amount", skipped), ) } containers, err := ap.selectContainersToAudit(epoch) if err != nil { log.Error("container selection failure", zap.String("error", err.Error())) return } log.Info("select containers for audit", zap.Int("amount", len(containers))) nm, err := ap.netmapClient.GetNetMap(0) if err != nil { ap.log.Error("can't fetch network map", zap.String("error", err.Error())) return } cancelChannel := make(chan struct{}) ap.prevAuditCanceler = func() { select { case <-cancelChannel: // already closed default: close(cancelChannel) } } pivot := make([]byte, sha256.Size) ap.startAuditTasksOnContainers(cancelChannel, containers, log, pivot, nm, epoch) } func (ap *Processor) startAuditTasksOnContainers(cancelChannel <-chan struct{}, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) { for i := range containers { cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure if err != nil { log.Error("can't get container info, ignore", zap.Stringer("cid", containers[i]), zap.String("error", err.Error())) continue } containers[i].Encode(pivot) // find all container nodes for current epoch nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), pivot) if err != nil { log.Info("can't build placement for container, ignore", zap.Stringer("cid", containers[i]), zap.String("error", err.Error())) continue } n := placement.FlattenNodes(nodes) // shuffle nodes to ask a random one rand.Shuffle(len(n), func(i, j int) { n[i], n[j] = n[j], n[i] }) // search storage groups storageGroupsIDs := ap.findStorageGroups(containers[i], n) log.Info("select storage groups for audit", zap.Stringer("cid", containers[i]), zap.Int("amount", len(storageGroupsIDs))) // filter expired storage groups storageGroups := ap.filterExpiredSG(containers[i], storageGroupsIDs, nodes, *nm) log.Info("filter expired storage groups for audit", zap.Stringer("cid", containers[i]), zap.Int("amount", len(storageGroups))) // skip audit for containers without // non-expired storage groups if len(storageGroupsIDs) == 0 { continue } auditTask := new(audit.Task). WithReporter(&epochAuditReporter{ epoch: epoch, rep: ap.reporter, }). WithCancelChannel(cancelChannel). WithContainerID(containers[i]). WithStorageGroupList(storageGroups). WithContainerStructure(cnr.Value). WithContainerNodes(nodes). WithNetworkMap(nm) ap.taskManager.PushTask(auditTask) } } func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []oid.ID { var sg []oid.ID ln := len(shuffled) var ( info clientcore.NodeInfo prm storagegroup.SearchSGPrm ) prm.Container = cnr for i := range shuffled { // consider iterating over some part of container log := ap.log.With( zap.Stringer("cid", cnr), zap.String("key", netmap.StringifyPublicKey(shuffled[0])), zap.Int("try", i), zap.Int("total_tries", ln), ) err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(shuffled[i])) if err != nil { log.Warn("parse client node info", zap.String("error", err.Error())) continue } ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) prm.NodeInfo = info var dst storagegroup.SearchSGDst err = ap.sgSrc.ListSG(ctx, &dst, prm) cancel() if err != nil { log.Warn("error in storage group search", zap.String("error", err.Error())) continue } sg = append(sg, dst.Objects...) break // we found storage groups, so break loop } return sg } func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID, cnr [][]netmap.NodeInfo, nm netmap.NetMap) []storagegroup.StorageGroup { sgs := make([]storagegroup.StorageGroup, 0, len(sgIDs)) var coreSG storagegroup.StorageGroup var getSGPrm storagegroup.GetSGPrm getSGPrm.CID = cid getSGPrm.Container = cnr getSGPrm.NetMap = nm for _, sgID := range sgIDs { ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) getSGPrm.OID = sgID sg, err := ap.sgSrc.GetSG(ctx, getSGPrm) cancel() if err != nil { ap.log.Error( "could not get storage group object for audit, skipping", zap.Stringer("cid", cid), zap.Stringer("oid", sgID), zap.Error(err), ) continue } // filter expired epochs if sg.ExpirationEpoch() >= ap.epochSrc.EpochCounter() { coreSG.SetID(sgID) coreSG.SetStorageGroup(*sg) sgs = append(sgs, coreSG) } } return sgs }