[#9999] adm: Resolve container type by metabase upgrade
Some checks failed
DCO action / DCO (pull_request) Successful in 1m29s
Tests and linters / Run gofumpt (pull_request) Successful in 1m32s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m10s
Vulncheck / Vulncheck (pull_request) Successful in 2m6s
Tests and linters / Staticcheck (pull_request) Failing after 2m18s
Tests and linters / Lint (pull_request) Failing after 2m24s
Build / Build Components (pull_request) Successful in 2m29s
Tests and linters / gopls check (pull_request) Successful in 2m42s
Tests and linters / Tests (pull_request) Successful in 4m19s
Tests and linters / Tests with -race (pull_request) Successful in 6m11s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-03 11:06:31 +03:00
parent d734ddafad
commit 2e91ec7e9d
4 changed files with 173 additions and 33 deletions

View file

@ -1,6 +1,7 @@
package metabase
import (
"context"
"errors"
"fmt"
"sync"
@ -10,19 +11,24 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
morphcontainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
const (
pathFlag = "path"
noCompactFlag = "no-compact"
)
var errNoPathsFound = errors.New("no metabase paths found")
var path string
var (
errNoPathsFound = errors.New("no metabase paths found")
errNoMorphEndpointsFound = errors.New("no morph endpoints found")
)
var UpgradeCmd = &cobra.Command{
Use: "upgrade",
@ -39,17 +45,10 @@ func upgrade(cmd *cobra.Command, _ []string) error {
if err != nil {
return err
}
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
var paths []string
if path != "" {
paths = append(paths, path)
}
appCfg := config.New(configFile, configDir, config.EnvPrefix)
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
paths = append(paths, sc.Metabase().Path())
return nil
}); err != nil {
return fmt.Errorf("failed to get metabase paths: %w", err)
paths, err := getMetabasePaths(appCfg)
if err != nil {
return err
}
if len(paths) == 0 {
return errNoPathsFound
@ -58,6 +57,16 @@ func upgrade(cmd *cobra.Command, _ []string) error {
for i, path := range paths {
cmd.Println(i+1, ":", path)
}
mc, err := createMorphClient(cmd.Context(), appCfg)
if err != nil {
return err
}
defer mc.Close()
tr, err := createContainerTypeResolver(mc)
if err != nil {
return err
}
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
result := make(map[string]bool)
var resultGuard sync.Mutex
eg, ctx := errgroup.WithContext(cmd.Context())
@ -65,7 +74,7 @@ func upgrade(cmd *cobra.Command, _ []string) error {
eg.Go(func() error {
var success bool
cmd.Println("upgrading metabase", path, "...")
if err := meta.Upgrade(ctx, path, !noCompact, func(a ...any) {
if err := meta.Upgrade(ctx, path, !noCompact, tr, func(a ...any) {
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...)
}); err != nil {
cmd.Println("error: failed to upgrade metabase", path, ":", err)
@ -92,8 +101,54 @@ func upgrade(cmd *cobra.Command, _ []string) error {
return nil
}
func getMetabasePaths(appCfg *config.Config) ([]string, error) {
var paths []string
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
paths = append(paths, sc.Metabase().Path())
return nil
}); err != nil {
return nil, fmt.Errorf("failed to get metabase paths: %w", err)
}
return paths, nil
}
func createMorphClient(ctx context.Context, appCfg *config.Config) (*client.Client, error) {
addresses := morphconfig.RPCEndpoint(appCfg)
if len(addresses) == 0 {
return nil, errNoMorphEndpointsFound
}
key := nodeconfig.Key(appCfg)
cli, err := client.New(ctx,
key,
client.WithDialTimeout(morphconfig.DialTimeout(appCfg)),
client.WithEndpoints(addresses...),
client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)),
)
if err != nil {
return nil, err
}
if err := cli.SetGroupSignerScope(); err != nil {
cli.Close()
return nil, err
}
return cli, err
}
func createContainerTypeResolver(cli *client.Client) (*container.TypeResolver, error) {
sh, err := cli.NNSContractAddress(client.NNSContainerContractName)
if err != nil {
return nil, err
}
cc, err := morphcontainer.NewFromMorph(cli, sh, 0, morphcontainer.TryNotary())
if err != nil {
return nil, err
}
return container.NewContainerTypeResolver(func() (container.Source, error) {
return morphcontainer.AsContainerSource(cc), nil
}), nil
}
func initUpgradeCommand() {
flags := UpgradeCmd.Flags()
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"os"
@ -12,6 +13,7 @@ import (
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
@ -25,15 +27,19 @@ const (
upgradeTimeout = 1 * time.Second
)
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
type ContainerInfoProvider interface {
IsS3Container(id cid.ID) (bool, error)
}
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, cs ContainerInfoProvider, log func(a ...any)) error{
2: upgradeFromV2ToV3,
3: func(_ context.Context, _ *bbolt.DB, log func(a ...any)) error {
3: func(_ context.Context, _ *bbolt.DB, cs ContainerInfoProvider, log func(a ...any)) error {
log("metabase already upgraded")
return nil
},
}
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
func Upgrade(ctx context.Context, path string, compact bool, cs ContainerInfoProvider, log func(a ...any)) error {
if _, err := os.Stat(path); err != nil {
return fmt.Errorf("check metabase existence: %w", err)
}
@ -61,7 +67,7 @@ func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any))
}); err != nil {
return fmt.Errorf("set upgrade key %w", err)
}
if err := updater(ctx, db, log); err != nil {
if err := updater(ctx, db, cs, log); err != nil {
return fmt.Errorf("update metabase schema: %w", err)
}
if err := db.Update(func(tx *bbolt.Tx) error {
@ -113,11 +119,11 @@ func compactDB(db *bbolt.DB) error {
return nil
}
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, cs ContainerInfoProvider, log func(a ...any)) error {
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
return err
}
if err := dropUserAttributes(ctx, db, log); err != nil {
if err := dropUserAttributes(ctx, db, cs, log); err != nil {
return err
}
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
@ -323,10 +329,70 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
return nil
}
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) {
log(append([]any{"user attributes:"}, a...)...)
})
func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs ContainerInfoProvider, log func(a ...any)) error {
log("deleting buckets...")
const batch = 1000
prefix := []byte{userAttributePrefix}
last := prefix
var count uint64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var keys [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
for k, _ := c.Seek(last); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
last = bytes.Clone(k)
attr, ok := attributeFromAttributeBucket(k)
if !ok {
return fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(k))
}
if !IsAtrributeIndexable(attr) {
keys = append(keys, bytes.Clone(k))
continue
}
contID, ok := cidFromAttributeBucket(k)
if !ok {
return fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(k))
}
isS3, err := cs.IsS3Container(contID)
if err != nil {
if client.IsErrContainerNotFound(err) {
keys = append(keys, bytes.Clone(k))
continue
}
return err
}
if isS3 {
keys = append(keys, bytes.Clone(k))
}
}
return nil
}); err != nil {
log("deleting buckets completed with an error:", err)
return err
}
if len(keys) == 0 {
log("deleting buckets completed successfully, deleted", count, "buckets")
return nil
}
if err := db.Update(func(tx *bbolt.Tx) error {
for _, k := range keys {
if err := tx.DeleteBucket(k); err != nil {
return err
}
}
return nil
}); err != nil {
log("deleting buckets completed with an error:", err)
return err
}
count += uint64(len(keys))
log("deleted", count, "buckets")
}
}
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {

View file

@ -35,13 +35,17 @@ func TestUpgradeV2ToV3(t *testing.T) {
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
require.NoError(t, db.Close())
require.NoError(t, Upgrade(context.Background(), path, true, t.Log))
require.NoError(t, Upgrade(context.Background(), path, true, &testContainerInfoProvider{}, t.Log))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
fmt.Println()
}
type testContainerInfoProvider struct{}
func (p *testContainerInfoProvider) IsS3Container(id cid.ID) (bool, error) { return false, nil }
func createTempCopy(t *testing.T, path string) string {
src, err := os.Open(path)
require.NoError(t, err)
@ -95,7 +99,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
@ -118,8 +122,8 @@ func TestGenerateMetabaseFile(t *testing.T) {
child.SetParent(parent)
idParent, _ := parent.ID()
child.SetParentID(idParent)
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(child, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(parent, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
_, err := db.Put(ctx, PutPrm{
@ -138,7 +142,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
@ -160,7 +164,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
@ -190,7 +194,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),

View file

@ -176,6 +176,21 @@ func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
return append(key[:bucketKeySize], attributeKey...)
}
func cidFromAttributeBucket(bucketName []byte) (cid.ID, bool) {
if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix {
return cid.ID{}, false
}
var result cid.ID
return result, result.Decode(bucketName[1:bucketKeySize]) == nil
}
func attributeFromAttributeBucket(bucketName []byte) (string, bool) {
if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix {
return "", false
}
return string(bucketName[bucketKeySize:]), true
}
// rootBucketName returns <CID>_root.
func rootBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, rootPrefix, key)