diff --git a/cmd/frostfs-adm/internal/modules/maintenance/root.go b/cmd/frostfs-adm/internal/modules/maintenance/root.go new file mode 100644 index 000000000..d67b70d2a --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/root.go @@ -0,0 +1,15 @@ +package maintenance + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/maintenance/zombie" + "github.com/spf13/cobra" +) + +var RootCmd = &cobra.Command{ + Use: "maintenance", + Short: "Section for maintenance commands", +} + +func init() { + RootCmd.AddCommand(zombie.Cmd) +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/key.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/key.go new file mode 100644 index 000000000..1b66889aa --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/key.go @@ -0,0 +1,70 @@ +package zombie + +import ( + "crypto/ecdsa" + "fmt" + "os" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "github.com/nspcc-dev/neo-go/cli/flags" + "github.com/nspcc-dev/neo-go/cli/input" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +func getPrivateKey(cmd *cobra.Command, appCfg *config.Config) *ecdsa.PrivateKey { + keyDesc := viper.GetString(walletFlag) + if keyDesc == "" { + return &nodeconfig.Key(appCfg).PrivateKey + } + data, err := os.ReadFile(keyDesc) + commonCmd.ExitOnErr(cmd, "open wallet file: %w", err) + + priv, err := keys.NewPrivateKeyFromBytes(data) + if err != nil { + w, err := wallet.NewWalletFromFile(keyDesc) + commonCmd.ExitOnErr(cmd, "provided key is incorrect, only wallet or binary key supported: %w", err) + return fromWallet(cmd, w, viper.GetString(addressFlag)) + } + return &priv.PrivateKey +} + +func fromWallet(cmd *cobra.Command, w *wallet.Wallet, addrStr string) *ecdsa.PrivateKey { + var ( + addr util.Uint160 + err error + ) + + if addrStr == "" { + addr = w.GetChangeAddress() + } else { + addr, err = flags.ParseAddress(addrStr) + commonCmd.ExitOnErr(cmd, "--address option must be specified and valid: %w", err) + } + + acc := w.GetAccount(addr) + if acc == nil { + commonCmd.ExitOnErr(cmd, "--address option must be specified and valid: %w", fmt.Errorf("can't find wallet account for %s", addrStr)) + } + + pass, err := getPassword() + commonCmd.ExitOnErr(cmd, "invalid password for the encrypted key: %w", err) + + commonCmd.ExitOnErr(cmd, "can't decrypt account: %w", acc.Decrypt(pass, keys.NEP2ScryptParams())) + + return &acc.PrivateKey().PrivateKey +} + +func getPassword() (string, error) { + // this check allows empty passwords + if viper.IsSet("password") { + return viper.GetString("password"), nil + } + + return input.ReadPassword("Enter password > ") +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/list.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/list.go new file mode 100644 index 000000000..f73f33db9 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/list.go @@ -0,0 +1,31 @@ +package zombie + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/spf13/cobra" +) + +func list(cmd *cobra.Command, _ []string) { + configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag) + configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag) + appCfg := config.New(configFile, configDir, config.EnvPrefix) + storageEngine := newEngine(cmd, appCfg) + q := createQuarantine(cmd, storageEngine.DumpInfo()) + var containerID *cid.ID + if cidStr, _ := cmd.Flags().GetString(cidFlag); cidStr != "" { + containerID = &cid.ID{} + commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr)) + } + + commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(a oid.Address) error { + if containerID != nil && a.Container() != *containerID { + return nil + } + cmd.Println(a.EncodeToString()) + return nil + })) +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/morph.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/morph.go new file mode 100644 index 000000000..cd3a64499 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/morph.go @@ -0,0 +1,46 @@ +package zombie + +import ( + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" + nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" + cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" + "github.com/spf13/cobra" +) + +func createMorphClient(cmd *cobra.Command, appCfg *config.Config) *client.Client { + addresses := morphconfig.RPCEndpoint(appCfg) + if len(addresses) == 0 { + commonCmd.ExitOnErr(cmd, "create morph client: %w", errors.New("no morph endpoints found")) + } + key := nodeconfig.Key(appCfg) + cli, err := client.New(cmd.Context(), + key, + client.WithDialTimeout(morphconfig.DialTimeout(appCfg)), + client.WithEndpoints(addresses...), + client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)), + ) + commonCmd.ExitOnErr(cmd, "create morph client: %w", err) + return cli +} + +func createContainerClient(cmd *cobra.Command, morph *client.Client) *cntClient.Client { + hs, err := morph.NNSContractAddress(client.NNSContainerContractName) + commonCmd.ExitOnErr(cmd, "resolve container contract hash: %w", err) + cc, err := cntClient.NewFromMorph(morph, hs, 0) + commonCmd.ExitOnErr(cmd, "create morph container client: %w", err) + return cc +} + +func createNetmapClient(cmd *cobra.Command, morph *client.Client) *netmapClient.Client { + hs, err := morph.NNSContractAddress(client.NNSNetmapContractName) + commonCmd.ExitOnErr(cmd, "resolve netmap contract hash: %w", err) + cli, err := netmapClient.NewFromMorph(morph, hs, 0) + commonCmd.ExitOnErr(cmd, "create morph netmap client: %w", err) + return cli +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/quarantine.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/quarantine.go new file mode 100644 index 000000000..27f83aec7 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/quarantine.go @@ -0,0 +1,154 @@ +package zombie + +import ( + "context" + "fmt" + "math" + "os" + "path/filepath" + "strings" + "sync" + + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/spf13/cobra" +) + +type quarantine struct { + // mtx protects current field. + mtx sync.Mutex + current int + trees []*fstree.FSTree +} + +func createQuarantine(cmd *cobra.Command, engineInfo engine.Info) *quarantine { + var paths []string + for _, sh := range engineInfo.Shards { + var storagePaths []string + for _, st := range sh.BlobStorInfo.SubStorages { + storagePaths = append(storagePaths, st.Path) + } + if len(storagePaths) == 0 { + continue + } + paths = append(paths, filepath.Join(commonPath(storagePaths), "quarantine")) + } + q, err := newQuarantine(paths) + commonCmd.ExitOnErr(cmd, "create quarantine: %w", err) + return q +} + +func commonPath(paths []string) string { + if len(paths) == 0 { + return "" + } + if len(paths) == 1 { + return paths[0] + } + minLen := math.MaxInt + for _, p := range paths { + if len(p) < minLen { + minLen = len(p) + } + } + + var sb strings.Builder + for i := range minLen { + for _, path := range paths[1:] { + if paths[0][i] != path[i] { + return sb.String() + } + } + sb.WriteByte(paths[0][i]) + } + return sb.String() +} + +func newQuarantine(paths []string) (*quarantine, error) { + var q quarantine + for i := range paths { + f := fstree.New( + fstree.WithDepth(1), + fstree.WithDirNameLen(1), + fstree.WithPath(paths[i]), + fstree.WithPerm(os.ModePerm), + ) + if err := f.Open(mode.ComponentReadWrite); err != nil { + return nil, fmt.Errorf("open fstree %s: %w", paths[i], err) + } + if err := f.Init(); err != nil { + return nil, fmt.Errorf("init fstree %s: %w", paths[i], err) + } + q.trees = append(q.trees, f) + } + return &q, nil +} + +func (q *quarantine) Get(ctx context.Context, a oid.Address) (*objectSDK.Object, error) { + for i := range q.trees { + res, err := q.trees[i].Get(ctx, common.GetPrm{Address: a}) + if err != nil { + continue + } + return res.Object, nil + } + return nil, &apistatus.ObjectNotFound{} +} + +func (q *quarantine) Delete(ctx context.Context, a oid.Address) error { + for i := range q.trees { + _, err := q.trees[i].Delete(ctx, common.DeletePrm{Address: a}) + if err != nil { + continue + } + return nil + } + return &apistatus.ObjectNotFound{} +} + +func (q *quarantine) Put(ctx context.Context, obj *objectSDK.Object) error { + data, err := obj.Marshal() + if err != nil { + return err + } + + var prm common.PutPrm + prm.Address = objectcore.AddressOf(obj) + prm.Object = obj + prm.RawData = data + + q.mtx.Lock() + current := q.current + q.current = (q.current + 1) % len(q.trees) + q.mtx.Unlock() + + _, err = q.trees[current].Put(ctx, prm) + return err +} + +func (q *quarantine) Iterate(ctx context.Context, f func(oid.Address) error) error { + var prm common.IteratePrm + prm.Handler = func(elem common.IterationElement) error { + return f(elem.Address) + } + for i := range q.trees { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + _, err := q.trees[i].Iterate(ctx, prm) + if err != nil { + return err + } + } + return nil +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/remove.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/remove.go new file mode 100644 index 000000000..0b8f2f172 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/remove.go @@ -0,0 +1,55 @@ +package zombie + +import ( + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/spf13/cobra" +) + +func remove(cmd *cobra.Command, _ []string) { + configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag) + configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag) + appCfg := config.New(configFile, configDir, config.EnvPrefix) + storageEngine := newEngine(cmd, appCfg) + q := createQuarantine(cmd, storageEngine.DumpInfo()) + + var containerID cid.ID + cidStr, _ := cmd.Flags().GetString(cidFlag) + commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr)) + + var objectID *oid.ID + oidStr, _ := cmd.Flags().GetString(oidFlag) + if oidStr != "" { + objectID = &oid.ID{} + commonCmd.ExitOnErr(cmd, "decode object ID string: %w", objectID.DecodeString(oidStr)) + } + + if objectID != nil { + var addr oid.Address + addr.SetContainer(containerID) + addr.SetObject(*objectID) + removeObject(cmd, q, addr) + } else { + commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(addr oid.Address) error { + if addr.Container() != containerID { + return nil + } + removeObject(cmd, q, addr) + return nil + })) + } +} + +func removeObject(cmd *cobra.Command, q *quarantine, addr oid.Address) { + err := q.Delete(cmd.Context(), addr) + if errors.Is(err, new(apistatus.ObjectNotFound)) { + return + } + commonCmd.ExitOnErr(cmd, "remove object from quarantine: %w", err) +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/restore.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/restore.go new file mode 100644 index 000000000..f179c7c2d --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/restore.go @@ -0,0 +1,69 @@ +package zombie + +import ( + "crypto/sha256" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/spf13/cobra" +) + +func restore(cmd *cobra.Command, _ []string) { + configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag) + configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag) + appCfg := config.New(configFile, configDir, config.EnvPrefix) + storageEngine := newEngine(cmd, appCfg) + q := createQuarantine(cmd, storageEngine.DumpInfo()) + morphClient := createMorphClient(cmd, appCfg) + cnrCli := createContainerClient(cmd, morphClient) + + var containerID cid.ID + cidStr, _ := cmd.Flags().GetString(cidFlag) + commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr)) + + var objectID *oid.ID + oidStr, _ := cmd.Flags().GetString(oidFlag) + if oidStr != "" { + objectID = &oid.ID{} + commonCmd.ExitOnErr(cmd, "decode object ID string: %w", objectID.DecodeString(oidStr)) + } + + if objectID != nil { + var addr oid.Address + addr.SetContainer(containerID) + addr.SetObject(*objectID) + restoreObject(cmd, storageEngine, q, addr, cnrCli) + } else { + commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(addr oid.Address) error { + if addr.Container() != containerID { + return nil + } + restoreObject(cmd, storageEngine, q, addr, cnrCli) + return nil + })) + } +} + +func restoreObject(cmd *cobra.Command, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address, cnrCli *cntClient.Client) { + obj, err := q.Get(cmd.Context(), addr) + commonCmd.ExitOnErr(cmd, "get object from quarantine: %w", err) + rawCID := make([]byte, sha256.Size) + + cid := addr.Container() + cid.Encode(rawCID) + cnr, err := cnrCli.Get(cmd.Context(), rawCID) + commonCmd.ExitOnErr(cmd, "get container: %w", err) + + putPrm := engine.PutPrm{ + Object: obj, + IsIndexedContainer: containerCore.IsIndexedContainer(cnr.Value), + } + commonCmd.ExitOnErr(cmd, "put object to storage engine: %w", storageEngine.Put(cmd.Context(), putPrm)) + commonCmd.ExitOnErr(cmd, "remove object from quarantine: %w", q.Delete(cmd.Context(), addr)) +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/root.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/root.go new file mode 100644 index 000000000..9ef18f7f8 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/root.go @@ -0,0 +1,125 @@ +package zombie + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +const ( + flagBatchSize = "batch-size" + flagBatchSizeUsage = "Objects iteration batch size" + cidFlag = "cid" + cidFlagUsage = "Container ID" + oidFlag = "oid" + oidFlagUsage = "Object ID" + walletFlag = "wallet" + walletFlagShorthand = "w" + walletFlagUsage = "Path to the wallet or binary key" + addressFlag = "address" + addressFlagUsage = "Address of wallet account" + moveFlag = "move" + moveFlagUsage = "Move objects from storage engine to quarantine" +) + +var ( + Cmd = &cobra.Command{ + Use: "zombie", + Short: "Zombie objects related commands", + } + scanCmd = &cobra.Command{ + Use: "scan", + Short: "Scan storage engine for zombie objects and move them to quarantine", + Long: "", + PreRun: func(cmd *cobra.Command, _ []string) { + _ = viper.BindPFlag(commonflags.EndpointFlag, cmd.Flags().Lookup(commonflags.EndpointFlag)) + _ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag)) + _ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag)) + _ = viper.BindPFlag(walletFlag, cmd.Flags().Lookup(walletFlag)) + _ = viper.BindPFlag(addressFlag, cmd.Flags().Lookup(addressFlag)) + _ = viper.BindPFlag(flagBatchSize, cmd.Flags().Lookup(flagBatchSize)) + _ = viper.BindPFlag(moveFlag, cmd.Flags().Lookup(moveFlag)) + }, + Run: scan, + } + listCmd = &cobra.Command{ + Use: "list", + Short: "List zombie objects from quarantine", + Long: "", + PreRun: func(cmd *cobra.Command, _ []string) { + _ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag)) + _ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag)) + _ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag)) + }, + Run: list, + } + restoreCmd = &cobra.Command{ + Use: "restore", + Short: "Restore zombie objects from quarantine", + Long: "", + PreRun: func(cmd *cobra.Command, _ []string) { + _ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag)) + _ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag)) + _ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag)) + _ = viper.BindPFlag(oidFlag, cmd.Flags().Lookup(oidFlag)) + }, + Run: restore, + } + removeCmd = &cobra.Command{ + Use: "remove", + Short: "Remove zombie objects from quarantine", + Long: "", + PreRun: func(cmd *cobra.Command, _ []string) { + _ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag)) + _ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag)) + _ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag)) + _ = viper.BindPFlag(oidFlag, cmd.Flags().Lookup(oidFlag)) + }, + Run: remove, + } +) + +func init() { + initScanCmd() + initListCmd() + initRestoreCmd() + initRemoveCmd() +} + +func initScanCmd() { + Cmd.AddCommand(scanCmd) + + scanCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc) + scanCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage) + scanCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage) + scanCmd.Flags().Uint32(flagBatchSize, 1000, flagBatchSizeUsage) + scanCmd.Flags().StringP(walletFlag, walletFlagShorthand, "", walletFlagUsage) + scanCmd.Flags().String(addressFlag, "", addressFlagUsage) + scanCmd.Flags().Bool(moveFlag, false, moveFlagUsage) +} + +func initListCmd() { + Cmd.AddCommand(listCmd) + + listCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage) + listCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage) + listCmd.Flags().String(cidFlag, "", cidFlagUsage) +} + +func initRestoreCmd() { + Cmd.AddCommand(restoreCmd) + + restoreCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage) + restoreCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage) + restoreCmd.Flags().String(cidFlag, "", cidFlagUsage) + restoreCmd.Flags().String(oidFlag, "", oidFlagUsage) +} + +func initRemoveCmd() { + Cmd.AddCommand(removeCmd) + + removeCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage) + removeCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage) + removeCmd.Flags().String(cidFlag, "", cidFlagUsage) + removeCmd.Flags().String(oidFlag, "", oidFlagUsage) +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/scan.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/scan.go new file mode 100644 index 000000000..268ec4911 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/scan.go @@ -0,0 +1,281 @@ +package zombie + +import ( + "context" + "crypto/ecdsa" + "crypto/sha256" + "errors" + "fmt" + "sync" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" + clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +func scan(cmd *cobra.Command, _ []string) { + configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag) + configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag) + appCfg := config.New(configFile, configDir, config.EnvPrefix) + batchSize, _ := cmd.Flags().GetUint32(flagBatchSize) + if batchSize == 0 { + commonCmd.ExitOnErr(cmd, "invalid batch size: %w", errors.New("batch size must be positive value")) + } + move, _ := cmd.Flags().GetBool(moveFlag) + + storageEngine := newEngine(cmd, appCfg) + morphClient := createMorphClient(cmd, appCfg) + cnrCli := createContainerClient(cmd, morphClient) + nmCli := createNetmapClient(cmd, morphClient) + q := createQuarantine(cmd, storageEngine.DumpInfo()) + pk := getPrivateKey(cmd, appCfg) + + epoch, err := nmCli.Epoch(cmd.Context()) + commonCmd.ExitOnErr(cmd, "read epoch from morph: %w", err) + + nm, err := nmCli.GetNetMapByEpoch(cmd.Context(), epoch) + commonCmd.ExitOnErr(cmd, "read netmap from morph: %w", err) + + cmd.Printf("Epoch: %d\n", nm.Epoch()) + cmd.Printf("Nodes in the netmap: %d\n", len(nm.Nodes())) + + ps := &processStatus{ + statusCount: make(map[status]uint64), + } + + stopCh := make(chan struct{}) + start := time.Now() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case <-cmd.Context().Done(): + return + case <-stopCh: + return + case <-tick.C: + fmt.Printf("Objects processed: %d; Time elapsed: %s\n", ps.total(), time.Since(start)) + } + } + }() + go func() { + defer wg.Done() + err = scanStorageEngine(cmd, batchSize, storageEngine, ps, appCfg, cnrCli, nmCli, q, pk, move) + close(stopCh) + }() + wg.Wait() + commonCmd.ExitOnErr(cmd, "scan storage engine for zombie objects: %w", err) + + cmd.Println() + cmd.Println("Status description:") + cmd.Println("undefined -- nothing is clear") + cmd.Println("found -- object is found in cluster") + cmd.Println("quarantine -- object is not found in cluster") + cmd.Println() + for status, count := range ps.statusCount { + cmd.Printf("Status: %s, Count: %d\n", status, count) + } +} + +type status string + +const ( + statusUndefined status = "undefined" + statusFound status = "found" + statusQuarantine status = "quarantine" +) + +func checkAddr(ctx context.Context, cnrCli *cntClient.Client, nmCli *netmap.Client, cc *cache.ClientCache, obj object.Info) (status, error) { + rawCID := make([]byte, sha256.Size) + cid := obj.Address.Container() + cid.Encode(rawCID) + + cnr, err := cnrCli.Get(ctx, rawCID) + if err != nil { + var errContainerNotFound *apistatus.ContainerNotFound + if errors.As(err, &errContainerNotFound) { + // Policer will deal with this object. + return statusFound, nil + } + return statusUndefined, fmt.Errorf("read container %s from morph: %w", cid, err) + } + nm, err := nmCli.NetMap(ctx) + if err != nil { + return statusUndefined, fmt.Errorf("read netmap from morph: %w", err) + } + + nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), rawCID) + if err != nil { + // Not enough nodes, check all netmap nodes. + nodes = append([][]netmap.NodeInfo{}, nm.Nodes()) + } + + objID := obj.Address.Object() + cnrID := obj.Address.Container() + local := true + raw := false + if obj.ECInfo != nil { + objID = obj.ECInfo.ParentID + local = false + raw = true + } + prm := clientSDK.PrmObjectHead{ + ObjectID: &objID, + ContainerID: &cnrID, + Local: local, + Raw: raw, + } + + var ni clientCore.NodeInfo + for i := range nodes { + for j := range nodes[i] { + if err := clientCore.NodeInfoFromRawNetmapElement(&ni, netmapCore.Node(nodes[i][j])); err != nil { + return statusUndefined, fmt.Errorf("parse node info: %w", err) + } + c, err := cc.Get(ni) + if err != nil { + continue + } + res, err := c.ObjectHead(ctx, prm) + if err != nil { + var errECInfo *objectSDK.ECInfoError + if raw && errors.As(err, &errECInfo) { + return statusFound, nil + } + continue + } + if err := apistatus.ErrFromStatus(res.Status()); err != nil { + continue + } + return statusFound, nil + } + } + + if cnr.Value.PlacementPolicy().NumberOfReplicas() == 1 && cnr.Value.PlacementPolicy().ReplicaDescriptor(0).NumberOfObjects() == 1 { + return statusFound, nil + } + return statusQuarantine, nil +} + +func scanStorageEngine(cmd *cobra.Command, batchSize uint32, storageEngine *engine.StorageEngine, ps *processStatus, + appCfg *config.Config, cnrCli *cntClient.Client, nmCli *netmap.Client, q *quarantine, pk *ecdsa.PrivateKey, move bool, +) error { + cc := cache.NewSDKClientCache(cache.ClientCacheOpts{ + DialTimeout: apiclientconfig.DialTimeout(appCfg), + StreamTimeout: apiclientconfig.StreamTimeout(appCfg), + ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), + Key: pk, + AllowExternal: apiclientconfig.AllowExternal(appCfg), + }) + ctx := cmd.Context() + + var cursor *engine.Cursor + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var prm engine.ListWithCursorPrm + prm.WithCursor(cursor) + prm.WithCount(batchSize) + + res, err := storageEngine.ListWithCursor(ctx, prm) + if err != nil { + if errors.Is(err, engine.ErrEndOfListing) { + return nil + } + return fmt.Errorf("list with cursor: %w", err) + } + + cursor = res.Cursor() + addrList := res.AddressList() + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(int(batchSize)) + + for i := range addrList { + addr := addrList[i] + eg.Go(func() error { + result, err := checkAddr(egCtx, cnrCli, nmCli, cc, addr) + if err != nil { + return fmt.Errorf("check object %s status: %w", addr.Address, err) + } + ps.add(result) + + if !move && result == statusQuarantine { + cmd.Println(addr) + return nil + } + + if result == statusQuarantine { + return moveToQuarantine(egCtx, storageEngine, q, addr.Address) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return fmt.Errorf("process objects batch: %w", err) + } + } +} + +func moveToQuarantine(ctx context.Context, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address) error { + var getPrm engine.GetPrm + getPrm.WithAddress(addr) + res, err := storageEngine.Get(ctx, getPrm) + if err != nil { + return fmt.Errorf("get object %s from storage engine: %w", addr, err) + } + + if err := q.Put(ctx, res.Object()); err != nil { + return fmt.Errorf("put object %s to quarantine: %w", addr, err) + } + + var delPrm engine.DeletePrm + delPrm.WithForceRemoval() + delPrm.WithAddress(addr) + + if err = storageEngine.Delete(ctx, delPrm); err != nil { + return fmt.Errorf("delete object %s from storage engine: %w", addr, err) + } + return nil +} + +type processStatus struct { + guard sync.RWMutex + statusCount map[status]uint64 + count uint64 +} + +func (s *processStatus) add(st status) { + s.guard.Lock() + defer s.guard.Unlock() + s.statusCount[st]++ + s.count++ +} + +func (s *processStatus) total() uint64 { + s.guard.RLock() + defer s.guard.RUnlock() + return s.count +} diff --git a/cmd/frostfs-adm/internal/modules/maintenance/zombie/storage_engine.go b/cmd/frostfs-adm/internal/modules/maintenance/zombie/storage_engine.go new file mode 100644 index 000000000..5851e049c --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/maintenance/zombie/storage_engine.go @@ -0,0 +1,203 @@ +package zombie + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" + shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" + blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" + fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/panjf2000/ants/v2" + "github.com/spf13/cobra" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +func newEngine(cmd *cobra.Command, c *config.Config) *engine.StorageEngine { + ngOpts := storageEngineOptions(c) + shardOpts := shardOptions(cmd, c) + e := engine.New(ngOpts...) + for _, opts := range shardOpts { + _, err := e.AddShard(cmd.Context(), opts...) + commonCmd.ExitOnErr(cmd, "iterate shards from config: %w", err) + } + commonCmd.ExitOnErr(cmd, "open storage engine: %w", e.Open(cmd.Context())) + commonCmd.ExitOnErr(cmd, "init storage engine: %w", e.Init(cmd.Context())) + return e +} + +func storageEngineOptions(c *config.Config) []engine.Option { + return []engine.Option{ + engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)), + engine.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + engine.WithLowMemoryConsumption(engineconfig.EngineLowMemoryConsumption(c)), + } +} + +func shardOptions(cmd *cobra.Command, c *config.Config) [][]shard.Option { + var result [][]shard.Option + err := engineconfig.IterateShards(c, false, func(sh *shardconfig.Config) error { + result = append(result, getShardOpts(cmd, c, sh)) + return nil + }) + commonCmd.ExitOnErr(cmd, "iterate shards from config: %w", err) + return result +} + +func getShardOpts(cmd *cobra.Command, c *config.Config, sh *shardconfig.Config) []shard.Option { + wc, wcEnabled := getWriteCacheOpts(sh) + return []shard.Option{ + shard.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + shard.WithRefillMetabase(sh.RefillMetabase()), + shard.WithRefillMetabaseWorkersCount(sh.RefillMetabaseWorkersCount()), + shard.WithMode(sh.Mode()), + shard.WithBlobStorOptions(getBlobstorOpts(cmd.Context(), sh)...), + shard.WithMetaBaseOptions(getMetabaseOpts(sh)...), + shard.WithPiloramaOptions(getPiloramaOpts(c, sh)...), + shard.WithWriteCache(wcEnabled), + shard.WithWriteCacheOptions(wc), + shard.WithRemoverBatchSize(sh.GC().RemoverBatchSize()), + shard.WithGCRemoverSleepInterval(sh.GC().RemoverSleepInterval()), + shard.WithExpiredCollectorBatchSize(sh.GC().ExpiredCollectorBatchSize()), + shard.WithExpiredCollectorWorkerCount(sh.GC().ExpiredCollectorWorkerCount()), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + commonCmd.ExitOnErr(cmd, "init GC pool: %w", err) + return pool + }), + shard.WithLimiter(qos.NewNoopLimiter()), + } +} + +func getWriteCacheOpts(sh *shardconfig.Config) ([]writecache.Option, bool) { + if wc := sh.WriteCache(); wc != nil && wc.Enabled() { + var result []writecache.Option + result = append(result, + writecache.WithPath(wc.Path()), + writecache.WithFlushSizeLimit(wc.MaxFlushingObjectsSize()), + writecache.WithMaxObjectSize(wc.MaxObjectSize()), + writecache.WithFlushWorkersCount(wc.WorkerCount()), + writecache.WithMaxCacheSize(wc.SizeLimit()), + writecache.WithMaxCacheCount(wc.CountLimit()), + writecache.WithNoSync(wc.NoSync()), + writecache.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + writecache.WithQoSLimiter(qos.NewNoopLimiter()), + ) + return result, true + } + return nil, false +} + +func getPiloramaOpts(c *config.Config, sh *shardconfig.Config) []pilorama.Option { + var piloramaOpts []pilorama.Option + if config.BoolSafe(c.Sub("tree"), "enabled") { + pr := sh.Pilorama() + piloramaOpts = append(piloramaOpts, + pilorama.WithPath(pr.Path()), + pilorama.WithPerm(pr.Perm()), + pilorama.WithNoSync(pr.NoSync()), + pilorama.WithMaxBatchSize(pr.MaxBatchSize()), + pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()), + ) + } + return piloramaOpts +} + +func getMetabaseOpts(sh *shardconfig.Config) []meta.Option { + return []meta.Option{ + meta.WithPath(sh.Metabase().Path()), + meta.WithPermissions(sh.Metabase().BoltDB().Perm()), + meta.WithMaxBatchSize(sh.Metabase().BoltDB().MaxBatchSize()), + meta.WithMaxBatchDelay(sh.Metabase().BoltDB().MaxBatchDelay()), + meta.WithBoltDBOptions(&bbolt.Options{ + Timeout: 100 * time.Millisecond, + }), + meta.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + meta.WithEpochState(&epochState{}), + } +} + +func getBlobstorOpts(ctx context.Context, sh *shardconfig.Config) []blobstor.Option { + result := []blobstor.Option{ + blobstor.WithCompressObjects(sh.Compress()), + blobstor.WithUncompressableContentTypes(sh.UncompressableContentTypes()), + blobstor.WithCompressibilityEstimate(sh.EstimateCompressibility()), + blobstor.WithCompressibilityEstimateThreshold(sh.EstimateCompressibilityThreshold()), + blobstor.WithStorages(getSubStorages(ctx, sh)), + blobstor.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + } + + return result +} + +func getSubStorages(ctx context.Context, sh *shardconfig.Config) []blobstor.SubStorage { + var ss []blobstor.SubStorage + for _, storage := range sh.BlobStor().Storages() { + switch storage.Type() { + case blobovniczatree.Type: + sub := blobovniczaconfig.From((*config.Config)(storage)) + blobTreeOpts := []blobovniczatree.Option{ + blobovniczatree.WithRootPath(storage.Path()), + blobovniczatree.WithPermissions(storage.Perm()), + blobovniczatree.WithBlobovniczaSize(sub.Size()), + blobovniczatree.WithBlobovniczaShallowDepth(sub.ShallowDepth()), + blobovniczatree.WithBlobovniczaShallowWidth(sub.ShallowWidth()), + blobovniczatree.WithOpenedCacheSize(sub.OpenedCacheSize()), + blobovniczatree.WithOpenedCacheTTL(sub.OpenedCacheTTL()), + blobovniczatree.WithOpenedCacheExpInterval(sub.OpenedCacheExpInterval()), + blobovniczatree.WithInitWorkerCount(sub.InitWorkerCount()), + blobovniczatree.WithWaitBeforeDropDB(sub.RebuildDropTimeout()), + blobovniczatree.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + blobovniczatree.WithObjectSizeLimit(sh.SmallSizeLimit()), + } + + ss = append(ss, blobstor.SubStorage{ + Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < sh.SmallSizeLimit() + }, + }) + case fstree.Type: + sub := fstreeconfig.From((*config.Config)(storage)) + fstreeOpts := []fstree.Option{ + fstree.WithPath(storage.Path()), + fstree.WithPerm(storage.Perm()), + fstree.WithDepth(sub.Depth()), + fstree.WithNoSync(sub.NoSync()), + fstree.WithLogger(logger.NewLoggerWrapper(zap.NewNop())), + } + + ss = append(ss, blobstor.SubStorage{ + Storage: fstree.New(fstreeOpts...), + Policy: func(_ *objectSDK.Object, _ []byte) bool { + return true + }, + }) + default: + // should never happen, that has already + // been handled: when the config was read + } + } + return ss +} + +type epochState struct{} + +func (epochState) CurrentEpoch() uint64 { + return 0 +} diff --git a/cmd/frostfs-adm/internal/modules/root.go b/cmd/frostfs-adm/internal/modules/root.go index e42204b7a..cc8225c7a 100644 --- a/cmd/frostfs-adm/internal/modules/root.go +++ b/cmd/frostfs-adm/internal/modules/root.go @@ -5,6 +5,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/maintenance" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph" "git.frostfs.info/TrueCloudLab/frostfs-node/misc" @@ -41,6 +42,7 @@ func init() { rootCmd.AddCommand(config.RootCmd) rootCmd.AddCommand(morph.RootCmd) rootCmd.AddCommand(metabase.RootCmd) + rootCmd.AddCommand(maintenance.RootCmd) rootCmd.AddCommand(autocomplete.Command("frostfs-adm")) rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))