diff --git a/cmd/frostfs-adm/internal/modules/metabase/upgrade.go b/cmd/frostfs-adm/internal/modules/metabase/upgrade.go index 96cb62f1..00b30c9b 100644 --- a/cmd/frostfs-adm/internal/modules/metabase/upgrade.go +++ b/cmd/frostfs-adm/internal/modules/metabase/upgrade.go @@ -1,6 +1,7 @@ package metabase import ( + "context" "errors" "fmt" "sync" @@ -10,19 +11,24 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" + morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" + nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" + morphcontainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) const ( - pathFlag = "path" noCompactFlag = "no-compact" ) -var errNoPathsFound = errors.New("no metabase paths found") - -var path string +var ( + errNoPathsFound = errors.New("no metabase paths found") + errNoMorphEndpointsFound = errors.New("no morph endpoints found") +) var UpgradeCmd = &cobra.Command{ Use: "upgrade", @@ -39,17 +45,10 @@ func upgrade(cmd *cobra.Command, _ []string) error { if err != nil { return err } - noCompact, _ := cmd.Flags().GetBool(noCompactFlag) - var paths []string - if path != "" { - paths = append(paths, path) - } appCfg := config.New(configFile, configDir, config.EnvPrefix) - if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error { - paths = append(paths, sc.Metabase().Path()) - return nil - }); err != nil { - return fmt.Errorf("failed to get metabase paths: %w", err) + paths, err := getMetabasePaths(appCfg) + if err != nil { + return err } if len(paths) == 0 { return errNoPathsFound @@ -58,6 +57,16 @@ func upgrade(cmd *cobra.Command, _ []string) error { for i, path := range paths { cmd.Println(i+1, ":", path) } + mc, err := createMorphClient(cmd.Context(), appCfg) + if err != nil { + return err + } + defer mc.Close() + civ, err := createContainerInfoProvider(mc) + if err != nil { + return err + } + noCompact, _ := cmd.Flags().GetBool(noCompactFlag) result := make(map[string]bool) var resultGuard sync.Mutex eg, ctx := errgroup.WithContext(cmd.Context()) @@ -65,7 +74,7 @@ func upgrade(cmd *cobra.Command, _ []string) error { eg.Go(func() error { var success bool cmd.Println("upgrading metabase", path, "...") - if err := meta.Upgrade(ctx, path, !noCompact, func(a ...any) { + if err := meta.Upgrade(ctx, path, !noCompact, civ, func(a ...any) { cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...) }); err != nil { cmd.Println("error: failed to upgrade metabase", path, ":", err) @@ -92,8 +101,50 @@ func upgrade(cmd *cobra.Command, _ []string) error { return nil } +func getMetabasePaths(appCfg *config.Config) ([]string, error) { + var paths []string + if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error { + paths = append(paths, sc.Metabase().Path()) + return nil + }); err != nil { + return nil, fmt.Errorf("get metabase paths: %w", err) + } + return paths, nil +} + +func createMorphClient(ctx context.Context, appCfg *config.Config) (*client.Client, error) { + addresses := morphconfig.RPCEndpoint(appCfg) + if len(addresses) == 0 { + return nil, errNoMorphEndpointsFound + } + key := nodeconfig.Key(appCfg) + cli, err := client.New(ctx, + key, + client.WithDialTimeout(morphconfig.DialTimeout(appCfg)), + client.WithEndpoints(addresses...), + client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)), + ) + if err != nil { + return nil, fmt.Errorf("create morph client:%w", err) + } + return cli, nil +} + +func createContainerInfoProvider(cli *client.Client) (container.InfoProvider, error) { + sh, err := cli.NNSContractAddress(client.NNSContainerContractName) + if err != nil { + return nil, fmt.Errorf("resolve container contract hash: %w", err) + } + cc, err := morphcontainer.NewFromMorph(cli, sh, 0, morphcontainer.TryNotary()) + if err != nil { + return nil, fmt.Errorf("create morph container client: %w", err) + } + return container.NewInfoProvider(func() (container.Source, error) { + return morphcontainer.AsContainerSource(cc), nil + }), nil +} + func initUpgradeCommand() { flags := UpgradeCmd.Flags() - flags.StringVar(&path, pathFlag, "", "Path to metabase file") flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file") } diff --git a/pkg/local_object_storage/metabase/upgrade.go b/pkg/local_object_storage/metabase/upgrade.go index b5de430d..f2a0107a 100644 --- a/pkg/local_object_storage/metabase/upgrade.go +++ b/pkg/local_object_storage/metabase/upgrade.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "encoding/hex" "errors" "fmt" "os" @@ -12,6 +13,7 @@ import ( "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -25,15 +27,15 @@ const ( upgradeTimeout = 1 * time.Second ) -var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{ +var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error{ 2: upgradeFromV2ToV3, - 3: func(_ context.Context, _ *bbolt.DB, log func(a ...any)) error { + 3: func(_ context.Context, _ *bbolt.DB, _ container.InfoProvider, log func(a ...any)) error { log("metabase already upgraded") return nil }, } -func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error { +func Upgrade(ctx context.Context, path string, compact bool, cs container.InfoProvider, log func(a ...any)) error { if _, err := os.Stat(path); err != nil { return fmt.Errorf("check metabase existence: %w", err) } @@ -61,7 +63,7 @@ func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) }); err != nil { return fmt.Errorf("set upgrade key %w", err) } - if err := updater(ctx, db, log); err != nil { + if err := updater(ctx, db, cs, log); err != nil { return fmt.Errorf("update metabase schema: %w", err) } if err := db.Update(func(tx *bbolt.Tx) error { @@ -113,11 +115,11 @@ func compactDB(db *bbolt.DB) error { return nil } -func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { +func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error { if err := createExpirationEpochBuckets(ctx, db, log); err != nil { return err } - if err := dropUserAttributes(ctx, db, log); err != nil { + if err := dropUserAttributes(ctx, db, cs, log); err != nil { return err } if err := dropOwnerIDIndex(ctx, db, log); err != nil { @@ -323,10 +325,81 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i return nil } -func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { - return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) { - log(append([]any{"user attributes:"}, a...)...) - }) +func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error { + log("deleting user attribute buckets...") + const batch = 1000 + prefix := []byte{userAttributePrefix} + last := prefix + var count uint64 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + var keys [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + c := tx.Cursor() + for k, _ := c.Seek(last); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() { + if bytes.Equal(last, k) { + continue + } + keys = append(keys, bytes.Clone(k)) + } + return nil + }); err != nil { + log("deleting user attribute buckets completed with an error:", err) + return err + } + if len(keys) == 0 { + log("deleting user attribute buckets completed successfully, deleted", count, "buckets") + return nil + } + last = keys[len(keys)-1] + keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs) + if err != nil { + return err + } + if err := db.Update(func(tx *bbolt.Tx) error { + for _, k := range keysToDrop { + if err := tx.DeleteBucket(k); err != nil { + return err + } + } + return nil + }); err != nil { + log("deleting buckets completed with an error:", err) + return err + } + count += uint64(len(keysToDrop)) + log("deleted", count, "buckets") + } +} + +func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) { + var keysToDrop [][]byte + for _, key := range keys { + attr, ok := attributeFromAttributeBucket(key) + if !ok { + return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key)) + } + if !IsAtrributeIndexed(attr) { + keysToDrop = append(keysToDrop, key) + continue + } + contID, ok := cidFromAttributeBucket(key) + if !ok { + return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key)) + } + info, err := cs.Info(contID) + if err != nil { + return nil, err + } + if info.Removed || !info.Indexed { + keysToDrop = append(keysToDrop, key) + } + } + return keysToDrop, nil } func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { diff --git a/pkg/local_object_storage/metabase/upgrade_test.go b/pkg/local_object_storage/metabase/upgrade_test.go index 3797de0a..9c525291 100644 --- a/pkg/local_object_storage/metabase/upgrade_test.go +++ b/pkg/local_object_storage/metabase/upgrade_test.go @@ -12,6 +12,7 @@ import ( "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -35,13 +36,19 @@ func TestUpgradeV2ToV3(t *testing.T) { require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) require.ErrorIs(t, db.Init(), ErrOutdatedVersion) require.NoError(t, db.Close()) - require.NoError(t, Upgrade(context.Background(), path, true, t.Log)) + require.NoError(t, Upgrade(context.Background(), path, true, &testContainerInfoProvider{}, t.Log)) require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) require.NoError(t, db.Init()) require.NoError(t, db.Close()) fmt.Println() } +type testContainerInfoProvider struct{} + +func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) { + return container.Info{}, nil +} + func createTempCopy(t *testing.T, path string) string { src, err := os.Open(path) require.NoError(t, err) @@ -95,7 +102,7 @@ func TestGenerateMetabaseFile(t *testing.T) { i := i eg.Go(func() error { obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) - testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10)) testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) _, err := db.Put(ctx, PutPrm{ obj: obj, @@ -118,8 +125,8 @@ func TestGenerateMetabaseFile(t *testing.T) { child.SetParent(parent) idParent, _ := parent.ID() child.SetParentID(idParent) - testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) - testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(child, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(parent, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10)) testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) _, err := db.Put(ctx, PutPrm{ @@ -138,7 +145,7 @@ func TestGenerateMetabaseFile(t *testing.T) { i := i eg.Go(func() error { obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) - testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10)) _, err := db.Put(ctx, PutPrm{ obj: obj, id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), @@ -160,7 +167,7 @@ func TestGenerateMetabaseFile(t *testing.T) { i := i eg.Go(func() error { obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) - testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10)) _, err := db.Put(ctx, PutPrm{ obj: obj, id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), @@ -190,7 +197,7 @@ func TestGenerateMetabaseFile(t *testing.T) { i := i eg.Go(func() error { obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) - testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10)) _, err := db.Put(ctx, PutPrm{ obj: obj, id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 4679de33..0a2f91a4 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -176,6 +176,21 @@ func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte { return append(key[:bucketKeySize], attributeKey...) } +func cidFromAttributeBucket(bucketName []byte) (cid.ID, bool) { + if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix { + return cid.ID{}, false + } + var result cid.ID + return result, result.Decode(bucketName[1:bucketKeySize]) == nil +} + +func attributeFromAttributeBucket(bucketName []byte) (string, bool) { + if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix { + return "", false + } + return string(bucketName[bucketKeySize:]), true +} + // rootBucketName returns _root. func rootBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, rootPrefix, key)