package zombie import ( "context" "crypto/ecdsa" "crypto/sha256" "errors" "fmt" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) func scan(cmd *cobra.Command, _ []string) { configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag) configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag) appCfg := config.New(configFile, configDir, config.EnvPrefix) batchSize, _ := cmd.Flags().GetUint32(flagBatchSize) if batchSize == 0 { commonCmd.ExitOnErr(cmd, "invalid batch size: %w", errors.New("batch size must be positive value")) } move, _ := cmd.Flags().GetBool(moveFlag) storageEngine := newEngine(cmd, appCfg) morphClient := createMorphClient(cmd, appCfg) cnrCli := createContainerClient(cmd, morphClient) nmCli := createNetmapClient(cmd, morphClient) q := createQuarantine(cmd, storageEngine.DumpInfo()) pk := getPrivateKey(cmd, appCfg) epoch, err := nmCli.Epoch(cmd.Context()) commonCmd.ExitOnErr(cmd, "read epoch from morph: %w", err) nm, err := nmCli.GetNetMapByEpoch(cmd.Context(), epoch) commonCmd.ExitOnErr(cmd, "read netmap from morph: %w", err) cmd.Printf("Epoch: %d\n", nm.Epoch()) cmd.Printf("Nodes in the netmap: %d\n", len(nm.Nodes())) ps := &processStatus{ statusCount: make(map[status]uint64), } stopCh := make(chan struct{}) start := time.Now() var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() tick := time.NewTicker(time.Second) defer tick.Stop() for { select { case <-cmd.Context().Done(): return case <-stopCh: return case <-tick.C: fmt.Printf("Objects processed: %d; Time elapsed: %s\n", ps.total(), time.Since(start)) } } }() go func() { defer wg.Done() err = scanStorageEngine(cmd, batchSize, storageEngine, ps, appCfg, cnrCli, nmCli, q, pk, move) close(stopCh) }() wg.Wait() commonCmd.ExitOnErr(cmd, "scan storage engine for zombie objects: %w", err) cmd.Println() cmd.Println("Status description:") cmd.Println("undefined -- nothing is clear") cmd.Println("found -- object is found in cluster") cmd.Println("quarantine -- object is not found in cluster") cmd.Println() for status, count := range ps.statusCount { cmd.Printf("Status: %s, Count: %d\n", status, count) } } type status string const ( statusUndefined status = "undefined" statusFound status = "found" statusQuarantine status = "quarantine" ) func checkAddr(ctx context.Context, cnrCli *cntClient.Client, nmCli *netmap.Client, cc *cache.ClientCache, obj object.Info) (status, error) { rawCID := make([]byte, sha256.Size) cid := obj.Address.Container() cid.Encode(rawCID) cnr, err := cnrCli.Get(ctx, rawCID) if err != nil { var errContainerNotFound *apistatus.ContainerNotFound if errors.As(err, &errContainerNotFound) { // Policer will deal with this object. return statusFound, nil } return statusUndefined, fmt.Errorf("read container %s from morph: %w", cid, err) } nm, err := nmCli.NetMap(ctx) if err != nil { return statusUndefined, fmt.Errorf("read netmap from morph: %w", err) } nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), rawCID) if err != nil { // Not enough nodes, check all netmap nodes. nodes = append([][]netmap.NodeInfo{}, nm.Nodes()) } objID := obj.Address.Object() cnrID := obj.Address.Container() local := true raw := false if obj.ECInfo != nil { objID = obj.ECInfo.ParentID local = false raw = true } prm := clientSDK.PrmObjectHead{ ObjectID: &objID, ContainerID: &cnrID, Local: local, Raw: raw, } var ni clientCore.NodeInfo for i := range nodes { for j := range nodes[i] { if err := clientCore.NodeInfoFromRawNetmapElement(&ni, netmapCore.Node(nodes[i][j])); err != nil { return statusUndefined, fmt.Errorf("parse node info: %w", err) } c, err := cc.Get(ni) if err != nil { continue } res, err := c.ObjectHead(ctx, prm) if err != nil { var errECInfo *objectSDK.ECInfoError if raw && errors.As(err, &errECInfo) { return statusFound, nil } continue } if err := apistatus.ErrFromStatus(res.Status()); err != nil { continue } return statusFound, nil } } if cnr.Value.PlacementPolicy().NumberOfReplicas() == 1 && cnr.Value.PlacementPolicy().ReplicaDescriptor(0).NumberOfObjects() == 1 { return statusFound, nil } return statusQuarantine, nil } func scanStorageEngine(cmd *cobra.Command, batchSize uint32, storageEngine *engine.StorageEngine, ps *processStatus, appCfg *config.Config, cnrCli *cntClient.Client, nmCli *netmap.Client, q *quarantine, pk *ecdsa.PrivateKey, move bool, ) error { cc := cache.NewSDKClientCache(cache.ClientCacheOpts{ DialTimeout: apiclientconfig.DialTimeout(appCfg), StreamTimeout: apiclientconfig.StreamTimeout(appCfg), ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), Key: pk, AllowExternal: apiclientconfig.AllowExternal(appCfg), }) ctx := cmd.Context() var cursor *engine.Cursor for { select { case <-ctx.Done(): return ctx.Err() default: } var prm engine.ListWithCursorPrm prm.WithCursor(cursor) prm.WithCount(batchSize) res, err := storageEngine.ListWithCursor(ctx, prm) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { return nil } return fmt.Errorf("list with cursor: %w", err) } cursor = res.Cursor() addrList := res.AddressList() eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(int(batchSize)) for i := range addrList { addr := addrList[i] eg.Go(func() error { result, err := checkAddr(egCtx, cnrCli, nmCli, cc, addr) if err != nil { return fmt.Errorf("check object %s status: %w", addr.Address, err) } ps.add(result) if !move && result == statusQuarantine { cmd.Println(addr) return nil } if result == statusQuarantine { return moveToQuarantine(egCtx, storageEngine, q, addr.Address) } return nil }) } if err := eg.Wait(); err != nil { return fmt.Errorf("process objects batch: %w", err) } } } func moveToQuarantine(ctx context.Context, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address) error { var getPrm engine.GetPrm getPrm.WithAddress(addr) res, err := storageEngine.Get(ctx, getPrm) if err != nil { return fmt.Errorf("get object %s from storage engine: %w", addr, err) } if err := q.Put(ctx, res.Object()); err != nil { return fmt.Errorf("put object %s to quarantine: %w", addr, err) } var delPrm engine.DeletePrm delPrm.WithForceRemoval() delPrm.WithAddress(addr) if err = storageEngine.Delete(ctx, delPrm); err != nil { return fmt.Errorf("delete object %s from storage engine: %w", addr, err) } return nil } type processStatus struct { guard sync.RWMutex statusCount map[status]uint64 count uint64 } func (s *processStatus) add(st status) { s.guard.Lock() defer s.guard.Unlock() s.statusCount[st]++ s.count++ } func (s *processStatus) total() uint64 { s.guard.RLock() defer s.guard.RUnlock() return s.count }