[#1412] adm: Resolve container type by metabase upgrade
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
48faa893dc
commit
9531745cb4
4 changed files with 179 additions and 33 deletions
|
@ -1,6 +1,7 @@
|
|||
package metabase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
@ -10,19 +11,24 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
morphcontainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
pathFlag = "path"
|
||||
noCompactFlag = "no-compact"
|
||||
)
|
||||
|
||||
var errNoPathsFound = errors.New("no metabase paths found")
|
||||
|
||||
var path string
|
||||
var (
|
||||
errNoPathsFound = errors.New("no metabase paths found")
|
||||
errNoMorphEndpointsFound = errors.New("no morph endpoints found")
|
||||
)
|
||||
|
||||
var UpgradeCmd = &cobra.Command{
|
||||
Use: "upgrade",
|
||||
|
@ -39,17 +45,10 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
|
||||
var paths []string
|
||||
if path != "" {
|
||||
paths = append(paths, path)
|
||||
}
|
||||
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
|
||||
paths = append(paths, sc.Metabase().Path())
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to get metabase paths: %w", err)
|
||||
paths, err := getMetabasePaths(appCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(paths) == 0 {
|
||||
return errNoPathsFound
|
||||
|
@ -58,6 +57,16 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
|||
for i, path := range paths {
|
||||
cmd.Println(i+1, ":", path)
|
||||
}
|
||||
mc, err := createMorphClient(cmd.Context(), appCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mc.Close()
|
||||
civ, err := createContainerInfoProvider(mc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
|
||||
result := make(map[string]bool)
|
||||
var resultGuard sync.Mutex
|
||||
eg, ctx := errgroup.WithContext(cmd.Context())
|
||||
|
@ -65,7 +74,7 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
|||
eg.Go(func() error {
|
||||
var success bool
|
||||
cmd.Println("upgrading metabase", path, "...")
|
||||
if err := meta.Upgrade(ctx, path, !noCompact, func(a ...any) {
|
||||
if err := meta.Upgrade(ctx, path, !noCompact, civ, func(a ...any) {
|
||||
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...)
|
||||
}); err != nil {
|
||||
cmd.Println("error: failed to upgrade metabase", path, ":", err)
|
||||
|
@ -92,8 +101,50 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func getMetabasePaths(appCfg *config.Config) ([]string, error) {
|
||||
var paths []string
|
||||
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
|
||||
paths = append(paths, sc.Metabase().Path())
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("get metabase paths: %w", err)
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func createMorphClient(ctx context.Context, appCfg *config.Config) (*client.Client, error) {
|
||||
addresses := morphconfig.RPCEndpoint(appCfg)
|
||||
if len(addresses) == 0 {
|
||||
return nil, errNoMorphEndpointsFound
|
||||
}
|
||||
key := nodeconfig.Key(appCfg)
|
||||
cli, err := client.New(ctx,
|
||||
key,
|
||||
client.WithDialTimeout(morphconfig.DialTimeout(appCfg)),
|
||||
client.WithEndpoints(addresses...),
|
||||
client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create morph client:%w", err)
|
||||
}
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
func createContainerInfoProvider(cli *client.Client) (container.InfoProvider, error) {
|
||||
sh, err := cli.NNSContractAddress(client.NNSContainerContractName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("resolve container contract hash: %w", err)
|
||||
}
|
||||
cc, err := morphcontainer.NewFromMorph(cli, sh, 0, morphcontainer.TryNotary())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create morph container client: %w", err)
|
||||
}
|
||||
return container.NewInfoProvider(func() (container.Source, error) {
|
||||
return morphcontainer.AsContainerSource(cc), nil
|
||||
}), nil
|
||||
}
|
||||
|
||||
func initUpgradeCommand() {
|
||||
flags := UpgradeCmd.Flags()
|
||||
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
|
||||
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"time"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -25,15 +27,15 @@ const (
|
|||
upgradeTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
|
||||
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error{
|
||||
2: upgradeFromV2ToV3,
|
||||
3: func(_ context.Context, _ *bbolt.DB, log func(a ...any)) error {
|
||||
3: func(_ context.Context, _ *bbolt.DB, _ container.InfoProvider, log func(a ...any)) error {
|
||||
log("metabase already upgraded")
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
|
||||
func Upgrade(ctx context.Context, path string, compact bool, cs container.InfoProvider, log func(a ...any)) error {
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
return fmt.Errorf("check metabase existence: %w", err)
|
||||
}
|
||||
|
@ -61,7 +63,7 @@ func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any))
|
|||
}); err != nil {
|
||||
return fmt.Errorf("set upgrade key %w", err)
|
||||
}
|
||||
if err := updater(ctx, db, log); err != nil {
|
||||
if err := updater(ctx, db, cs, log); err != nil {
|
||||
return fmt.Errorf("update metabase schema: %w", err)
|
||||
}
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
|
@ -113,11 +115,11 @@ func compactDB(db *bbolt.DB) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error {
|
||||
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropUserAttributes(ctx, db, log); err != nil {
|
||||
if err := dropUserAttributes(ctx, db, cs, log); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
|
||||
|
@ -323,10 +325,81 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
|
|||
return nil
|
||||
}
|
||||
|
||||
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) {
|
||||
log(append([]any{"user attributes:"}, a...)...)
|
||||
})
|
||||
func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error {
|
||||
log("deleting user attribute buckets...")
|
||||
const batch = 1000
|
||||
prefix := []byte{userAttributePrefix}
|
||||
last := prefix
|
||||
var count uint64
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
var keys [][]byte
|
||||
if err := db.View(func(tx *bbolt.Tx) error {
|
||||
c := tx.Cursor()
|
||||
for k, _ := c.Seek(last); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
|
||||
if bytes.Equal(last, k) {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, bytes.Clone(k))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log("deleting user attribute buckets completed with an error:", err)
|
||||
return err
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
log("deleting user attribute buckets completed successfully, deleted", count, "buckets")
|
||||
return nil
|
||||
}
|
||||
last = keys[len(keys)-1]
|
||||
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
for _, k := range keysToDrop {
|
||||
if err := tx.DeleteBucket(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log("deleting buckets completed with an error:", err)
|
||||
return err
|
||||
}
|
||||
count += uint64(len(keysToDrop))
|
||||
log("deleted", count, "buckets")
|
||||
}
|
||||
}
|
||||
|
||||
func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) {
|
||||
var keysToDrop [][]byte
|
||||
for _, key := range keys {
|
||||
attr, ok := attributeFromAttributeBucket(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
||||
}
|
||||
if !IsAtrributeIndexed(attr) {
|
||||
keysToDrop = append(keysToDrop, key)
|
||||
continue
|
||||
}
|
||||
contID, ok := cidFromAttributeBucket(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
||||
}
|
||||
info, err := cs.Info(contID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if info.Removed || !info.Indexed {
|
||||
keysToDrop = append(keysToDrop, key)
|
||||
}
|
||||
}
|
||||
return keysToDrop, nil
|
||||
}
|
||||
|
||||
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -35,13 +36,19 @@ func TestUpgradeV2ToV3(t *testing.T) {
|
|||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
|
||||
require.NoError(t, db.Close())
|
||||
require.NoError(t, Upgrade(context.Background(), path, true, t.Log))
|
||||
require.NoError(t, Upgrade(context.Background(), path, true, &testContainerInfoProvider{}, t.Log))
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
type testContainerInfoProvider struct{}
|
||||
|
||||
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
|
||||
return container.Info{}, nil
|
||||
}
|
||||
|
||||
func createTempCopy(t *testing.T, path string) string {
|
||||
src, err := os.Open(path)
|
||||
require.NoError(t, err)
|
||||
|
@ -95,7 +102,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
|||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
|
@ -118,8 +125,8 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
|||
child.SetParent(parent)
|
||||
idParent, _ := parent.ID()
|
||||
child.SetParentID(idParent)
|
||||
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(child, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(parent, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
|
@ -138,7 +145,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
|||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
|
@ -160,7 +167,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
|||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
|
@ -190,7 +197,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
|||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
|
|
|
@ -176,6 +176,21 @@ func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
|||
return append(key[:bucketKeySize], attributeKey...)
|
||||
}
|
||||
|
||||
func cidFromAttributeBucket(bucketName []byte) (cid.ID, bool) {
|
||||
if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix {
|
||||
return cid.ID{}, false
|
||||
}
|
||||
var result cid.ID
|
||||
return result, result.Decode(bucketName[1:bucketKeySize]) == nil
|
||||
}
|
||||
|
||||
func attributeFromAttributeBucket(bucketName []byte) (string, bool) {
|
||||
if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix {
|
||||
return "", false
|
||||
}
|
||||
return string(bucketName[bucketKeySize:]), true
|
||||
}
|
||||
|
||||
// rootBucketName returns <CID>_root.
|
||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, rootPrefix, key)
|
||||
|
|
Loading…
Reference in a new issue