Add maintenance zombie
commands -> v0.42.18 #1711
12 changed files with 1044 additions and 1 deletions
15
cmd/frostfs-adm/internal/modules/maintenance/root.go
Normal file
15
cmd/frostfs-adm/internal/modules/maintenance/root.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package maintenance
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/maintenance/zombie"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var RootCmd = &cobra.Command{
|
||||||
|
Use: "maintenance",
|
||||||
|
Short: "Section for maintenance commands",
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RootCmd.AddCommand(zombie.Cmd)
|
||||||
|
}
|
70
cmd/frostfs-adm/internal/modules/maintenance/zombie/key.go
Normal file
70
cmd/frostfs-adm/internal/modules/maintenance/zombie/key.go
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"github.com/nspcc-dev/neo-go/cli/flags"
|
||||||
|
"github.com/nspcc-dev/neo-go/cli/input"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getPrivateKey(cmd *cobra.Command, appCfg *config.Config) *ecdsa.PrivateKey {
|
||||||
|
keyDesc := viper.GetString(walletFlag)
|
||||||
|
if keyDesc == "" {
|
||||||
|
return &nodeconfig.Key(appCfg).PrivateKey
|
||||||
|
}
|
||||||
|
data, err := os.ReadFile(keyDesc)
|
||||||
|
commonCmd.ExitOnErr(cmd, "open wallet file: %w", err)
|
||||||
|
|
||||||
|
priv, err := keys.NewPrivateKeyFromBytes(data)
|
||||||
|
if err != nil {
|
||||||
|
w, err := wallet.NewWalletFromFile(keyDesc)
|
||||||
|
commonCmd.ExitOnErr(cmd, "provided key is incorrect, only wallet or binary key supported: %w", err)
|
||||||
|
return fromWallet(cmd, w, viper.GetString(addressFlag))
|
||||||
|
}
|
||||||
|
return &priv.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func fromWallet(cmd *cobra.Command, w *wallet.Wallet, addrStr string) *ecdsa.PrivateKey {
|
||||||
|
var (
|
||||||
|
addr util.Uint160
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if addrStr == "" {
|
||||||
|
addr = w.GetChangeAddress()
|
||||||
|
} else {
|
||||||
|
addr, err = flags.ParseAddress(addrStr)
|
||||||
|
commonCmd.ExitOnErr(cmd, "--address option must be specified and valid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := w.GetAccount(addr)
|
||||||
|
if acc == nil {
|
||||||
|
commonCmd.ExitOnErr(cmd, "--address option must be specified and valid: %w", fmt.Errorf("can't find wallet account for %s", addrStr))
|
||||||
|
}
|
||||||
|
|
||||||
|
pass, err := getPassword()
|
||||||
|
commonCmd.ExitOnErr(cmd, "invalid password for the encrypted key: %w", err)
|
||||||
|
|
||||||
|
commonCmd.ExitOnErr(cmd, "can't decrypt account: %w", acc.Decrypt(pass, keys.NEP2ScryptParams()))
|
||||||
|
|
||||||
|
return &acc.PrivateKey().PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPassword() (string, error) {
|
||||||
|
// this check allows empty passwords
|
||||||
|
if viper.IsSet("password") {
|
||||||
|
return viper.GetString("password"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return input.ReadPassword("Enter password > ")
|
||||||
|
}
|
31
cmd/frostfs-adm/internal/modules/maintenance/zombie/list.go
Normal file
31
cmd/frostfs-adm/internal/modules/maintenance/zombie/list.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func list(cmd *cobra.Command, _ []string) {
|
||||||
|
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
|
||||||
|
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
|
||||||
|
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||||
|
storageEngine := newEngine(cmd, appCfg)
|
||||||
|
q := createQuarantine(cmd, storageEngine.DumpInfo())
|
||||||
|
var containerID *cid.ID
|
||||||
|
if cidStr, _ := cmd.Flags().GetString(cidFlag); cidStr != "" {
|
||||||
|
containerID = &cid.ID{}
|
||||||
|
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr))
|
||||||
|
}
|
||||||
|
|
||||||
|
commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(a oid.Address) error {
|
||||||
|
if containerID != nil && a.Container() != *containerID {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cmd.Println(a.EncodeToString())
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
}
|
46
cmd/frostfs-adm/internal/modules/maintenance/zombie/morph.go
Normal file
46
cmd/frostfs-adm/internal/modules/maintenance/zombie/morph.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
|
netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createMorphClient(cmd *cobra.Command, appCfg *config.Config) *client.Client {
|
||||||
|
addresses := morphconfig.RPCEndpoint(appCfg)
|
||||||
|
if len(addresses) == 0 {
|
||||||
|
commonCmd.ExitOnErr(cmd, "create morph client: %w", errors.New("no morph endpoints found"))
|
||||||
|
}
|
||||||
|
key := nodeconfig.Key(appCfg)
|
||||||
|
cli, err := client.New(cmd.Context(),
|
||||||
|
key,
|
||||||
|
client.WithDialTimeout(morphconfig.DialTimeout(appCfg)),
|
||||||
|
client.WithEndpoints(addresses...),
|
||||||
|
client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)),
|
||||||
|
)
|
||||||
|
commonCmd.ExitOnErr(cmd, "create morph client: %w", err)
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
|
||||||
|
func createContainerClient(cmd *cobra.Command, morph *client.Client) *cntClient.Client {
|
||||||
|
hs, err := morph.NNSContractAddress(client.NNSContainerContractName)
|
||||||
|
commonCmd.ExitOnErr(cmd, "resolve container contract hash: %w", err)
|
||||||
|
cc, err := cntClient.NewFromMorph(morph, hs, 0, cntClient.TryNotary())
|
||||||
|
commonCmd.ExitOnErr(cmd, "create morph container client: %w", err)
|
||||||
|
return cc
|
||||||
|
}
|
||||||
|
|
||||||
|
func createNetmapClient(cmd *cobra.Command, morph *client.Client) *netmapClient.Client {
|
||||||
|
hs, err := morph.NNSContractAddress(client.NNSNetmapContractName)
|
||||||
|
commonCmd.ExitOnErr(cmd, "resolve netmap contract hash: %w", err)
|
||||||
|
cli, err := netmapClient.NewFromMorph(morph, hs, 0, netmapClient.TryNotary())
|
||||||
|
commonCmd.ExitOnErr(cmd, "create morph netmap client: %w", err)
|
||||||
|
return cli
|
||||||
|
}
|
|
@ -0,0 +1,154 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
type quarantine struct {
|
||||||
|
// mtx protects current field.
|
||||||
|
mtx sync.Mutex
|
||||||
|
current int
|
||||||
|
trees []*fstree.FSTree
|
||||||
|
}
|
||||||
|
|
||||||
|
func createQuarantine(cmd *cobra.Command, engineInfo engine.Info) *quarantine {
|
||||||
|
var paths []string
|
||||||
|
for _, sh := range engineInfo.Shards {
|
||||||
|
var storagePaths []string
|
||||||
|
for _, st := range sh.BlobStorInfo.SubStorages {
|
||||||
|
storagePaths = append(storagePaths, st.Path)
|
||||||
|
}
|
||||||
|
if len(storagePaths) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
paths = append(paths, filepath.Join(commonPath(storagePaths), "quarantine"))
|
||||||
|
}
|
||||||
|
q, err := newQuarantine(paths)
|
||||||
|
commonCmd.ExitOnErr(cmd, "create quarantine: %w", err)
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
func commonPath(paths []string) string {
|
||||||
|
if len(paths) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if len(paths) == 1 {
|
||||||
|
return paths[0]
|
||||||
|
}
|
||||||
|
minLen := math.MaxInt
|
||||||
|
for _, p := range paths {
|
||||||
|
if len(p) < minLen {
|
||||||
|
minLen = len(p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var sb strings.Builder
|
||||||
|
for i := 0; i < minLen; i++ {
|
||||||
|
for _, path := range paths[1:] {
|
||||||
|
if paths[0][i] != path[i] {
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.WriteByte(paths[0][i])
|
||||||
|
}
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newQuarantine(paths []string) (*quarantine, error) {
|
||||||
|
var q quarantine
|
||||||
|
for i := range paths {
|
||||||
|
f := fstree.New(
|
||||||
|
fstree.WithDepth(1),
|
||||||
|
fstree.WithDirNameLen(1),
|
||||||
|
fstree.WithPath(paths[i]),
|
||||||
|
fstree.WithPerm(os.ModePerm),
|
||||||
|
)
|
||||||
|
if err := f.Open(mode.ComponentReadWrite); err != nil {
|
||||||
|
return nil, fmt.Errorf("open fstree %s: %w", paths[i], err)
|
||||||
|
}
|
||||||
|
if err := f.Init(); err != nil {
|
||||||
|
return nil, fmt.Errorf("init fstree %s: %w", paths[i], err)
|
||||||
|
}
|
||||||
|
q.trees = append(q.trees, f)
|
||||||
|
}
|
||||||
|
return &q, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *quarantine) Get(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
for i := range q.trees {
|
||||||
|
res, err := q.trees[i].Get(ctx, common.GetPrm{Address: a})
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return res.Object, nil
|
||||||
|
}
|
||||||
|
return nil, &apistatus.ObjectNotFound{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *quarantine) Delete(ctx context.Context, a oid.Address) error {
|
||||||
|
for i := range q.trees {
|
||||||
|
_, err := q.trees[i].Delete(ctx, common.DeletePrm{Address: a})
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &apistatus.ObjectNotFound{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *quarantine) Put(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Address = objectcore.AddressOf(obj)
|
||||||
|
prm.Object = obj
|
||||||
|
prm.RawData = data
|
||||||
|
|
||||||
|
q.mtx.Lock()
|
||||||
|
current := q.current
|
||||||
|
q.current = (q.current + 1) % len(q.trees)
|
||||||
|
q.mtx.Unlock()
|
||||||
|
|
||||||
|
_, err = q.trees[current].Put(ctx, prm)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *quarantine) Iterate(ctx context.Context, f func(oid.Address) error) error {
|
||||||
|
var prm common.IteratePrm
|
||||||
|
prm.Handler = func(elem common.IterationElement) error {
|
||||||
|
return f(elem.Address)
|
||||||
|
}
|
||||||
|
for i := range q.trees {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := q.trees[i].Iterate(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func remove(cmd *cobra.Command, _ []string) {
|
||||||
|
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
|
||||||
|
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
|
||||||
|
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||||
|
storageEngine := newEngine(cmd, appCfg)
|
||||||
|
q := createQuarantine(cmd, storageEngine.DumpInfo())
|
||||||
|
|
||||||
|
var containerID cid.ID
|
||||||
|
cidStr, _ := cmd.Flags().GetString(cidFlag)
|
||||||
|
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr))
|
||||||
|
|
||||||
|
var objectID *oid.ID
|
||||||
|
oidStr, _ := cmd.Flags().GetString(oidFlag)
|
||||||
|
if oidStr != "" {
|
||||||
|
objectID = &oid.ID{}
|
||||||
|
commonCmd.ExitOnErr(cmd, "decode object ID string: %w", objectID.DecodeString(oidStr))
|
||||||
|
}
|
||||||
|
|
||||||
|
if objectID != nil {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(containerID)
|
||||||
|
addr.SetObject(*objectID)
|
||||||
|
removeObject(cmd, q, addr)
|
||||||
|
} else {
|
||||||
|
commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(addr oid.Address) error {
|
||||||
|
if addr.Container() != containerID {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
removeObject(cmd, q, addr)
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeObject(cmd *cobra.Command, q *quarantine, addr oid.Address) {
|
||||||
|
err := q.Delete(cmd.Context(), addr)
|
||||||
|
if errors.Is(err, new(apistatus.ObjectNotFound)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
commonCmd.ExitOnErr(cmd, "remove object from quarantine: %w", err)
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func restore(cmd *cobra.Command, _ []string) {
|
||||||
|
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
|
||||||
|
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
|
||||||
|
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||||
|
storageEngine := newEngine(cmd, appCfg)
|
||||||
|
q := createQuarantine(cmd, storageEngine.DumpInfo())
|
||||||
|
|
||||||
|
var containerID cid.ID
|
||||||
|
cidStr, _ := cmd.Flags().GetString(cidFlag)
|
||||||
|
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr))
|
||||||
|
|
||||||
|
var objectID *oid.ID
|
||||||
|
oidStr, _ := cmd.Flags().GetString(oidFlag)
|
||||||
|
if oidStr != "" {
|
||||||
|
objectID = &oid.ID{}
|
||||||
|
commonCmd.ExitOnErr(cmd, "decode object ID string: %w", objectID.DecodeString(oidStr))
|
||||||
|
}
|
||||||
|
|
||||||
|
if objectID != nil {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(containerID)
|
||||||
|
addr.SetObject(*objectID)
|
||||||
|
restoreObject(cmd, storageEngine, q, addr)
|
||||||
|
} else {
|
||||||
|
commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(addr oid.Address) error {
|
||||||
|
if addr.Container() != containerID {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
restoreObject(cmd, storageEngine, q, addr)
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func restoreObject(cmd *cobra.Command, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address) {
|
||||||
|
obj, err := q.Get(cmd.Context(), addr)
|
||||||
|
commonCmd.ExitOnErr(cmd, "get object from quarantine: %w", err)
|
||||||
|
var putPrm engine.PutPrm
|
||||||
|
putPrm.WithObject(obj)
|
||||||
|
commonCmd.ExitOnErr(cmd, "put object to storage engine: %w", storageEngine.Put(cmd.Context(), putPrm))
|
||||||
|
commonCmd.ExitOnErr(cmd, "remove object from quarantine: %w", q.Delete(cmd.Context(), addr))
|
||||||
|
}
|
123
cmd/frostfs-adm/internal/modules/maintenance/zombie/root.go
Normal file
123
cmd/frostfs-adm/internal/modules/maintenance/zombie/root.go
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
flagBatchSize = "batch-size"
|
||||||
|
flagBatchSizeUsage = "Objects iteration batch size"
|
||||||
|
cidFlag = "cid"
|
||||||
|
cidFlagUsage = "Container ID"
|
||||||
|
oidFlag = "oid"
|
||||||
|
oidFlagUsage = "Object ID"
|
||||||
|
walletFlag = "wallet"
|
||||||
|
walletFlagShorthand = "w"
|
||||||
|
walletFlagUsage = "Path to the wallet or binary key"
|
||||||
|
addressFlag = "address"
|
||||||
|
addressFlagUsage = "Address of wallet account"
|
||||||
|
moveFlag = "move"
|
||||||
|
moveFlagUsage = "Move objects from storage engine to quarantine"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
Cmd = &cobra.Command{
|
||||||
|
Use: "zombie",
|
||||||
|
Short: "Zombie objects related commands",
|
||||||
|
}
|
||||||
|
scanCmd = &cobra.Command{
|
||||||
|
Use: "scan",
|
||||||
|
Short: "Scan storage engine for zombie objects and move them to quarantine",
|
||||||
|
Long: "",
|
||||||
|
PreRun: func(cmd *cobra.Command, _ []string) {
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
|
||||||
|
_ = viper.BindPFlag(walletFlag, cmd.Flags().Lookup(walletFlag))
|
||||||
|
_ = viper.BindPFlag(addressFlag, cmd.Flags().Lookup(addressFlag))
|
||||||
|
_ = viper.BindPFlag(flagBatchSize, cmd.Flags().Lookup(flagBatchSize))
|
||||||
|
_ = viper.BindPFlag(moveFlag, cmd.Flags().Lookup(moveFlag))
|
||||||
|
},
|
||||||
|
Run: scan,
|
||||||
|
}
|
||||||
|
listCmd = &cobra.Command{
|
||||||
|
Use: "list",
|
||||||
|
Short: "List zombie objects from quarantine",
|
||||||
|
Long: "",
|
||||||
|
PreRun: func(cmd *cobra.Command, _ []string) {
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
|
||||||
|
_ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag))
|
||||||
|
},
|
||||||
|
Run: list,
|
||||||
|
}
|
||||||
|
restoreCmd = &cobra.Command{
|
||||||
|
Use: "restore",
|
||||||
|
Short: "Restore zombie objects from quarantine",
|
||||||
|
Long: "",
|
||||||
|
PreRun: func(cmd *cobra.Command, _ []string) {
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
|
||||||
|
_ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag))
|
||||||
|
_ = viper.BindPFlag(oidFlag, cmd.Flags().Lookup(oidFlag))
|
||||||
|
},
|
||||||
|
Run: restore,
|
||||||
|
}
|
||||||
|
removeCmd = &cobra.Command{
|
||||||
|
Use: "remove",
|
||||||
|
Short: "Remove zombie objects from quarantine",
|
||||||
|
Long: "",
|
||||||
|
PreRun: func(cmd *cobra.Command, _ []string) {
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
|
||||||
|
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
|
||||||
|
_ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag))
|
||||||
|
_ = viper.BindPFlag(oidFlag, cmd.Flags().Lookup(oidFlag))
|
||||||
|
},
|
||||||
|
Run: remove,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
initScanCmd()
|
||||||
|
initListCmd()
|
||||||
|
initRestoreCmd()
|
||||||
|
initRemoveCmd()
|
||||||
|
}
|
||||||
|
|
||||||
|
func initScanCmd() {
|
||||||
|
Cmd.AddCommand(scanCmd)
|
||||||
|
|
||||||
|
scanCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
|
||||||
|
scanCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
|
||||||
|
scanCmd.Flags().Uint32(flagBatchSize, 1000, flagBatchSizeUsage)
|
||||||
|
scanCmd.Flags().StringP(walletFlag, walletFlagShorthand, "", walletFlagUsage)
|
||||||
|
scanCmd.Flags().String(addressFlag, "", addressFlagUsage)
|
||||||
|
scanCmd.Flags().Bool(moveFlag, false, moveFlagUsage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initListCmd() {
|
||||||
|
Cmd.AddCommand(listCmd)
|
||||||
|
|
||||||
|
listCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
|
||||||
|
listCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
|
||||||
|
listCmd.Flags().String(cidFlag, "", cidFlagUsage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initRestoreCmd() {
|
||||||
|
Cmd.AddCommand(restoreCmd)
|
||||||
|
|
||||||
|
restoreCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
|
||||||
|
restoreCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
|
||||||
|
restoreCmd.Flags().String(cidFlag, "", cidFlagUsage)
|
||||||
|
restoreCmd.Flags().String(oidFlag, "", oidFlagUsage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initRemoveCmd() {
|
||||||
|
Cmd.AddCommand(removeCmd)
|
||||||
|
|
||||||
|
removeCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
|
||||||
|
removeCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
|
||||||
|
removeCmd.Flags().String(cidFlag, "", cidFlagUsage)
|
||||||
|
removeCmd.Flags().String(oidFlag, "", oidFlagUsage)
|
||||||
|
}
|
282
cmd/frostfs-adm/internal/modules/maintenance/zombie/scan.go
Normal file
282
cmd/frostfs-adm/internal/modules/maintenance/zombie/scan.go
Normal file
|
@ -0,0 +1,282 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"crypto/sha256"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
||||||
|
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func scan(cmd *cobra.Command, _ []string) {
|
||||||
|
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
|
||||||
|
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
|
||||||
|
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||||
|
batchSize, _ := cmd.Flags().GetUint32(flagBatchSize)
|
||||||
|
if batchSize == 0 {
|
||||||
|
commonCmd.ExitOnErr(cmd, "invalid batch size: %w", errors.New("batch size must be positive value"))
|
||||||
|
}
|
||||||
|
move, _ := cmd.Flags().GetBool(moveFlag)
|
||||||
|
|
||||||
|
storageEngine := newEngine(cmd, appCfg)
|
||||||
|
morphClient := createMorphClient(cmd, appCfg)
|
||||||
|
cnrCli := createContainerClient(cmd, morphClient)
|
||||||
|
nmCli := createNetmapClient(cmd, morphClient)
|
||||||
|
q := createQuarantine(cmd, storageEngine.DumpInfo())
|
||||||
|
pk := getPrivateKey(cmd, appCfg)
|
||||||
|
|
||||||
|
epoch, err := nmCli.Epoch()
|
||||||
|
commonCmd.ExitOnErr(cmd, "read epoch from morph: %w", err)
|
||||||
|
|
||||||
|
nm, err := nmCli.GetNetMapByEpoch(epoch)
|
||||||
|
commonCmd.ExitOnErr(cmd, "read netmap from morph: %w", err)
|
||||||
|
|
||||||
|
cmd.Printf("Epoch: %d\n", nm.Epoch())
|
||||||
|
cmd.Printf("Nodes in the netmap: %d\n", len(nm.Nodes()))
|
||||||
|
|
||||||
|
ps := &processStatus{
|
||||||
|
statusCount: make(map[status]uint64),
|
||||||
|
}
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
start := time.Now()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
tick := time.NewTicker(time.Second)
|
||||||
|
defer tick.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-cmd.Context().Done():
|
||||||
|
return
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
case <-tick.C:
|
||||||
|
fmt.Printf("Objects processed: %d; Time elapsed: %s\n", ps.total(), time.Since(start))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
err = scanStorageEngine(cmd, batchSize, storageEngine, ps, appCfg, cnrCli, nmCli, q, pk, move)
|
||||||
|
close(stopCh)
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
commonCmd.ExitOnErr(cmd, "scan storage engine for zombie objects: %w", err)
|
||||||
|
|
||||||
|
cmd.Println()
|
||||||
|
cmd.Println("Status description:")
|
||||||
|
cmd.Println("undefined -- nothing is clear")
|
||||||
|
cmd.Println("found -- object is found in cluster")
|
||||||
|
cmd.Println("quarantine -- object is not found in cluster")
|
||||||
|
cmd.Println()
|
||||||
|
for status, count := range ps.statusCount {
|
||||||
|
cmd.Printf("Status: %s, Count: %d\n", status, count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type status string
|
||||||
|
|
||||||
|
const (
|
||||||
|
statusUndefined status = "undefined"
|
||||||
|
statusFound status = "found"
|
||||||
|
statusQuarantine status = "quarantine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func checkAddr(ctx context.Context, cnrCli *cntClient.Client, nmCli *netmap.Client, cc *cache.ClientCache, obj object.Info) (status, error) {
|
||||||
|
rawCID := make([]byte, sha256.Size)
|
||||||
|
cid := obj.Address.Container()
|
||||||
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
|
cnr, err := cnrCli.Get(rawCID)
|
||||||
|
if err != nil {
|
||||||
|
var errContainerNotFound *apistatus.ContainerNotFound
|
||||||
|
if errors.As(err, &errContainerNotFound) {
|
||||||
|
// Policer will deal with this object.
|
||||||
|
return statusFound, nil
|
||||||
|
}
|
||||||
|
return statusUndefined, fmt.Errorf("read container %s from morph: %w", cid, err)
|
||||||
|
}
|
||||||
|
nm, err := nmCli.NetMap()
|
||||||
|
if err != nil {
|
||||||
|
return statusUndefined, fmt.Errorf("read netmap from morph: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), rawCID)
|
||||||
|
if err != nil {
|
||||||
|
// Not enough nodes, check all netmap nodes.
|
||||||
|
nodes = append([][]netmap.NodeInfo{}, nm.Nodes())
|
||||||
|
}
|
||||||
|
|
||||||
|
objID := obj.Address.Object()
|
||||||
|
cnrID := obj.Address.Container()
|
||||||
|
|||||||
|
local := true
|
||||||
|
raw := false
|
||||||
|
if obj.ECInfo != nil {
|
||||||
fyrchik
commented
You check whether parent object can be headed. You check whether parent object can be headed.
This won't work if we have exactly the last chunk needed to construct HEAD, as the local node is down.
I believe more correct approach is to perform `raw` head and consider `ECInfoError` a witness of object existence.
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
objID = obj.ECInfo.ParentID
|
||||||
|
local = false
|
||||||
|
raw = true
|
||||||
|
}
|
||||||
|
prm := clientSDK.PrmObjectHead{
|
||||||
|
ObjectID: &objID,
|
||||||
|
ContainerID: &cnrID,
|
||||||
|
Local: local,
|
||||||
|
Raw: raw,
|
||||||
|
}
|
||||||
|
|
||||||
|
var ni clientCore.NodeInfo
|
||||||
|
for i := range nodes {
|
||||||
|
for j := range nodes[i] {
|
||||||
|
if err := clientCore.NodeInfoFromRawNetmapElement(&ni, netmapCore.Node(nodes[i][j])); err != nil {
|
||||||
|
return statusUndefined, fmt.Errorf("parse node info: %w", err)
|
||||||
|
}
|
||||||
|
c, err := cc.Get(ni)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
res, err := c.ObjectHead(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
|
if raw && errors.As(err, &errECInfo) {
|
||||||
|
return statusFound, nil
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := apistatus.ErrFromStatus(res.Status()); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return statusFound, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cnr.Value.PlacementPolicy().NumberOfReplicas() == 1 && cnr.Value.PlacementPolicy().ReplicaDescriptor(0).NumberOfObjects() == 1 {
|
||||||
|
return statusFound, nil
|
||||||
|
}
|
||||||
|
return statusQuarantine, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func scanStorageEngine(cmd *cobra.Command, batchSize uint32, storageEngine *engine.StorageEngine, ps *processStatus,
|
||||||
|
appCfg *config.Config, cnrCli *cntClient.Client, nmCli *netmap.Client, q *quarantine, pk *ecdsa.PrivateKey, move bool,
|
||||||
|
) error {
|
||||||
|
cc := cache.NewSDKClientCache(cache.ClientCacheOpts{
|
||||||
|
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
||||||
|
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
||||||
|
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
||||||
|
Key: pk,
|
||||||
|
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
||||||
|
})
|
||||||
|
ctx := cmd.Context()
|
||||||
|
|
||||||
|
var cursor *engine.Cursor
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm engine.ListWithCursorPrm
|
||||||
|
prm.WithCursor(cursor)
|
||||||
|
prm.WithCount(batchSize)
|
||||||
|
|
||||||
|
res, err := storageEngine.ListWithCursor(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, engine.ErrEndOfListing) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("list with cursor: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cursor = res.Cursor()
|
||||||
|
addrList := res.AddressList()
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
eg.SetLimit(int(batchSize))
|
||||||
|
|
||||||
|
for i := range addrList {
|
||||||
|
addr := addrList[i]
|
||||||
|
eg.Go(func() error {
|
||||||
|
result, err := checkAddr(egCtx, cnrCli, nmCli, cc, addr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("check object %s status: %w", addr.Address, err)
|
||||||
|
}
|
||||||
|
ps.add(result)
|
||||||
|
|
||||||
|
if !move && result == statusQuarantine {
|
||||||
|
cmd.Println(addr)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if result == statusQuarantine {
|
||||||
|
return moveToQuarantine(egCtx, storageEngine, q, addr.Address)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return fmt.Errorf("process objects batch: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveToQuarantine(ctx context.Context, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address) error {
|
||||||
|
var getPrm engine.GetPrm
|
||||||
|
getPrm.WithAddress(addr)
|
||||||
|
res, err := storageEngine.Get(ctx, getPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get object %s from storage engine: %w", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := q.Put(ctx, res.Object()); err != nil {
|
||||||
|
return fmt.Errorf("put object %s to quarantine: %w", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var delPrm engine.DeletePrm
|
||||||
|
delPrm.WithForceRemoval()
|
||||||
|
delPrm.WithAddress(addr)
|
||||||
|
|
||||||
|
_, err = storageEngine.Delete(ctx, delPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete object %s from storage engine: %w", addr, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type processStatus struct {
|
||||||
|
guard sync.RWMutex
|
||||||
|
statusCount map[status]uint64
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *processStatus) add(st status) {
|
||||||
|
s.guard.Lock()
|
||||||
|
defer s.guard.Unlock()
|
||||||
|
s.statusCount[st]++
|
||||||
|
s.count++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *processStatus) total() uint64 {
|
||||||
|
s.guard.RLock()
|
||||||
|
defer s.guard.RUnlock()
|
||||||
|
return s.count
|
||||||
|
}
|
|
@ -0,0 +1,204 @@
|
||||||
|
package zombie
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||||
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
|
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||||
|
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newEngine(cmd *cobra.Command, c *config.Config) *engine.StorageEngine {
|
||||||
|
ngOpts := storageEngineOptions(c)
|
||||||
|
shardOpts := shardOptions(cmd, c)
|
||||||
|
e := engine.New(ngOpts...)
|
||||||
|
for _, opts := range shardOpts {
|
||||||
|
_, err := e.AddShard(cmd.Context(), opts...)
|
||||||
|
commonCmd.ExitOnErr(cmd, "iterate shards from config: %w", err)
|
||||||
|
}
|
||||||
|
commonCmd.ExitOnErr(cmd, "open storage engine: %w", e.Open(cmd.Context()))
|
||||||
|
commonCmd.ExitOnErr(cmd, "init storage engine: %w", e.Init(cmd.Context()))
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func storageEngineOptions(c *config.Config) []engine.Option {
|
||||||
|
return []engine.Option{
|
||||||
|
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)),
|
||||||
|
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)),
|
||||||
|
engine.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
engine.WithLowMemoryConsumption(engineconfig.EngineLowMemoryConsumption(c)),
|
||||||
|
engine.WithRebuildWorkersCount(engineconfig.EngineRebuildWorkersCount(c)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func shardOptions(cmd *cobra.Command, c *config.Config) [][]shard.Option {
|
||||||
|
var result [][]shard.Option
|
||||||
|
err := engineconfig.IterateShards(c, false, func(sh *shardconfig.Config) error {
|
||||||
|
result = append(result, getShardOpts(cmd, c, sh))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "iterate shards from config: %w", err)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func getShardOpts(cmd *cobra.Command, c *config.Config, sh *shardconfig.Config) []shard.Option {
|
||||||
|
wc, wcEnabled := getWriteCacheOpts(sh)
|
||||||
|
return []shard.Option{
|
||||||
|
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
shard.WithRefillMetabase(sh.RefillMetabase()),
|
||||||
|
shard.WithRefillMetabaseWorkersCount(sh.RefillMetabaseWorkersCount()),
|
||||||
|
shard.WithMode(sh.Mode()),
|
||||||
|
shard.WithBlobStorOptions(getBlobstorOpts(cmd.Context(), sh)...),
|
||||||
|
shard.WithMetaBaseOptions(getMetabaseOpts(sh)...),
|
||||||
|
shard.WithPiloramaOptions(getPiloramaOpts(c, sh)...),
|
||||||
|
shard.WithWriteCache(wcEnabled),
|
||||||
|
shard.WithWriteCacheOptions(wc),
|
||||||
|
shard.WithRemoverBatchSize(sh.GC().RemoverBatchSize()),
|
||||||
|
shard.WithGCRemoverSleepInterval(sh.GC().RemoverSleepInterval()),
|
||||||
|
shard.WithExpiredCollectorBatchSize(sh.GC().ExpiredCollectorBatchSize()),
|
||||||
|
shard.WithExpiredCollectorWorkerCount(sh.GC().ExpiredCollectorWorkerCount()),
|
||||||
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||||
|
pool, err := ants.NewPool(sz)
|
||||||
|
commonCmd.ExitOnErr(cmd, "init GC pool: %w", err)
|
||||||
|
return pool
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getWriteCacheOpts(sh *shardconfig.Config) ([]writecache.Option, bool) {
|
||||||
|
if wc := sh.WriteCache(); wc != nil && wc.Enabled() {
|
||||||
|
var result []writecache.Option
|
||||||
|
result = append(result,
|
||||||
|
writecache.WithPath(wc.Path()),
|
||||||
|
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
|
||||||
|
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
|
||||||
|
writecache.WithPageSize(wc.BoltDB().PageSize()),
|
||||||
|
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
|
||||||
|
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
|
||||||
|
writecache.WithFlushWorkersCount(wc.WorkerCount()),
|
||||||
|
writecache.WithMaxCacheSize(wc.SizeLimit()),
|
||||||
|
writecache.WithNoSync(wc.NoSync()),
|
||||||
|
writecache.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
)
|
||||||
|
return result, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPiloramaOpts(c *config.Config, sh *shardconfig.Config) []pilorama.Option {
|
||||||
|
var piloramaOpts []pilorama.Option
|
||||||
|
if config.BoolSafe(c.Sub("tree"), "enabled") {
|
||||||
|
pr := sh.Pilorama()
|
||||||
|
piloramaOpts = append(piloramaOpts,
|
||||||
|
pilorama.WithPath(pr.Path()),
|
||||||
|
pilorama.WithPerm(pr.Perm()),
|
||||||
|
pilorama.WithNoSync(pr.NoSync()),
|
||||||
|
pilorama.WithMaxBatchSize(pr.MaxBatchSize()),
|
||||||
|
pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return piloramaOpts
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMetabaseOpts(sh *shardconfig.Config) []meta.Option {
|
||||||
|
return []meta.Option{
|
||||||
|
meta.WithPath(sh.Metabase().Path()),
|
||||||
|
meta.WithPermissions(sh.Metabase().BoltDB().Perm()),
|
||||||
|
meta.WithMaxBatchSize(sh.Metabase().BoltDB().MaxBatchSize()),
|
||||||
|
meta.WithMaxBatchDelay(sh.Metabase().BoltDB().MaxBatchDelay()),
|
||||||
|
meta.WithBoltDBOptions(&bbolt.Options{
|
||||||
|
Timeout: 100 * time.Millisecond,
|
||||||
|
}),
|
||||||
|
meta.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
meta.WithEpochState(&epochState{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBlobstorOpts(ctx context.Context, sh *shardconfig.Config) []blobstor.Option {
|
||||||
|
result := []blobstor.Option{
|
||||||
|
blobstor.WithCompressObjects(sh.Compress()),
|
||||||
|
blobstor.WithUncompressableContentTypes(sh.UncompressableContentTypes()),
|
||||||
|
blobstor.WithCompressibilityEstimate(sh.EstimateCompressibility()),
|
||||||
|
blobstor.WithCompressibilityEstimateThreshold(sh.EstimateCompressibilityThreshold()),
|
||||||
|
blobstor.WithStorages(getSubStorages(ctx, sh)),
|
||||||
|
blobstor.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSubStorages(ctx context.Context, sh *shardconfig.Config) []blobstor.SubStorage {
|
||||||
|
var ss []blobstor.SubStorage
|
||||||
|
for _, storage := range sh.BlobStor().Storages() {
|
||||||
|
switch storage.Type() {
|
||||||
|
case blobovniczatree.Type:
|
||||||
|
sub := blobovniczaconfig.From((*config.Config)(storage))
|
||||||
|
blobTreeOpts := []blobovniczatree.Option{
|
||||||
|
blobovniczatree.WithRootPath(storage.Path()),
|
||||||
|
blobovniczatree.WithPermissions(storage.Perm()),
|
||||||
|
blobovniczatree.WithBlobovniczaSize(sub.Size()),
|
||||||
|
blobovniczatree.WithBlobovniczaShallowDepth(sub.ShallowDepth()),
|
||||||
|
blobovniczatree.WithBlobovniczaShallowWidth(sub.ShallowWidth()),
|
||||||
|
blobovniczatree.WithOpenedCacheSize(sub.OpenedCacheSize()),
|
||||||
|
blobovniczatree.WithOpenedCacheTTL(sub.OpenedCacheTTL()),
|
||||||
|
blobovniczatree.WithOpenedCacheExpInterval(sub.OpenedCacheExpInterval()),
|
||||||
|
blobovniczatree.WithInitWorkerCount(sub.InitWorkerCount()),
|
||||||
|
blobovniczatree.WithWaitBeforeDropDB(sub.RebuildDropTimeout()),
|
||||||
|
blobovniczatree.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
blobovniczatree.WithObjectSizeLimit(sh.SmallSizeLimit()),
|
||||||
|
}
|
||||||
|
|
||||||
|
ss = append(ss, blobstor.SubStorage{
|
||||||
|
Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...),
|
||||||
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
|
return uint64(len(data)) < sh.SmallSizeLimit()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
case fstree.Type:
|
||||||
|
sub := fstreeconfig.From((*config.Config)(storage))
|
||||||
|
fstreeOpts := []fstree.Option{
|
||||||
|
fstree.WithPath(storage.Path()),
|
||||||
|
fstree.WithPerm(storage.Perm()),
|
||||||
|
fstree.WithDepth(sub.Depth()),
|
||||||
|
fstree.WithNoSync(sub.NoSync()),
|
||||||
|
fstree.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
|
}
|
||||||
|
|
||||||
|
ss = append(ss, blobstor.SubStorage{
|
||||||
|
Storage: fstree.New(fstreeOpts...),
|
||||||
|
Policy: func(_ *objectSDK.Object, _ []byte) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
// should never happen, that has already
|
||||||
|
// been handled: when the config was read
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ss
|
||||||
|
}
|
||||||
|
|
||||||
|
type epochState struct{}
|
||||||
|
|
||||||
|
func (epochState) CurrentEpoch() uint64 {
|
||||||
|
return 0
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/maintenance"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
||||||
|
@ -41,6 +42,7 @@ func init() {
|
||||||
rootCmd.AddCommand(config.RootCmd)
|
rootCmd.AddCommand(config.RootCmd)
|
||||||
rootCmd.AddCommand(morph.RootCmd)
|
rootCmd.AddCommand(morph.RootCmd)
|
||||||
rootCmd.AddCommand(storagecfg.RootCmd)
|
rootCmd.AddCommand(storagecfg.RootCmd)
|
||||||
|
rootCmd.AddCommand(maintenance.RootCmd)
|
||||||
|
|
||||||
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
|
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
|
||||||
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))
|
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))
|
||||||
|
|
|
@ -13,6 +13,13 @@ type ECInfo struct {
|
||||||
Total uint32
|
Total uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *ECInfo) String() string {
|
||||||
|
if v == nil {
|
||||||
|
return "<nil>"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("parent ID: %s, index: %d, total %d", v.ParentID, v.Index, v.Total)
|
||||||
|
}
|
||||||
|
|
||||||
// Info groups object address with its FrostFS
|
// Info groups object address with its FrostFS
|
||||||
// object info.
|
// object info.
|
||||||
type Info struct {
|
type Info struct {
|
||||||
|
@ -23,5 +30,5 @@ type Info struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v Info) String() string {
|
func (v Info) String() string {
|
||||||
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
return fmt.Sprintf("address: %s, type: %s, is linking: %t, EC header: %s", v.Address, v.Type, v.IsLinkingObject, v.ECInfo)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue
Not a big deal, but