[#1412] adm: Resolve container type by metabase upgrade
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
3da168f8cf
commit
8093e145b3
4 changed files with 179 additions and 33 deletions
|
@ -1,6 +1,7 @@
|
||||||
package metabase
|
package metabase
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -10,19 +11,24 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
morphcontainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
pathFlag = "path"
|
|
||||||
noCompactFlag = "no-compact"
|
noCompactFlag = "no-compact"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNoPathsFound = errors.New("no metabase paths found")
|
var (
|
||||||
|
errNoPathsFound = errors.New("no metabase paths found")
|
||||||
var path string
|
errNoMorphEndpointsFound = errors.New("no morph endpoints found")
|
||||||
|
)
|
||||||
|
|
||||||
var UpgradeCmd = &cobra.Command{
|
var UpgradeCmd = &cobra.Command{
|
||||||
Use: "upgrade",
|
Use: "upgrade",
|
||||||
|
@ -39,17 +45,10 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
|
|
||||||
var paths []string
|
|
||||||
if path != "" {
|
|
||||||
paths = append(paths, path)
|
|
||||||
}
|
|
||||||
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||||
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
|
paths, err := getMetabasePaths(appCfg)
|
||||||
paths = append(paths, sc.Metabase().Path())
|
if err != nil {
|
||||||
return nil
|
return err
|
||||||
}); err != nil {
|
|
||||||
return fmt.Errorf("failed to get metabase paths: %w", err)
|
|
||||||
}
|
}
|
||||||
if len(paths) == 0 {
|
if len(paths) == 0 {
|
||||||
return errNoPathsFound
|
return errNoPathsFound
|
||||||
|
@ -58,6 +57,16 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
||||||
for i, path := range paths {
|
for i, path := range paths {
|
||||||
cmd.Println(i+1, ":", path)
|
cmd.Println(i+1, ":", path)
|
||||||
}
|
}
|
||||||
|
mc, err := createMorphClient(cmd.Context(), appCfg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer mc.Close()
|
||||||
|
civ, err := createContainerInfoProvider(mc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
|
||||||
result := make(map[string]bool)
|
result := make(map[string]bool)
|
||||||
var resultGuard sync.Mutex
|
var resultGuard sync.Mutex
|
||||||
eg, ctx := errgroup.WithContext(cmd.Context())
|
eg, ctx := errgroup.WithContext(cmd.Context())
|
||||||
|
@ -65,7 +74,7 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
var success bool
|
var success bool
|
||||||
cmd.Println("upgrading metabase", path, "...")
|
cmd.Println("upgrading metabase", path, "...")
|
||||||
if err := meta.Upgrade(ctx, path, !noCompact, func(a ...any) {
|
if err := meta.Upgrade(ctx, path, !noCompact, civ, func(a ...any) {
|
||||||
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...)
|
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
cmd.Println("error: failed to upgrade metabase", path, ":", err)
|
cmd.Println("error: failed to upgrade metabase", path, ":", err)
|
||||||
|
@ -92,8 +101,50 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getMetabasePaths(appCfg *config.Config) ([]string, error) {
|
||||||
|
var paths []string
|
||||||
|
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
|
||||||
|
paths = append(paths, sc.Metabase().Path())
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, fmt.Errorf("get metabase paths: %w", err)
|
||||||
|
}
|
||||||
|
return paths, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createMorphClient(ctx context.Context, appCfg *config.Config) (*client.Client, error) {
|
||||||
|
addresses := morphconfig.RPCEndpoint(appCfg)
|
||||||
|
if len(addresses) == 0 {
|
||||||
|
return nil, errNoMorphEndpointsFound
|
||||||
|
}
|
||||||
|
key := nodeconfig.Key(appCfg)
|
||||||
|
cli, err := client.New(ctx,
|
||||||
|
key,
|
||||||
|
client.WithDialTimeout(morphconfig.DialTimeout(appCfg)),
|
||||||
|
client.WithEndpoints(addresses...),
|
||||||
|
client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create morph client:%w", err)
|
||||||
|
}
|
||||||
|
return cli, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createContainerInfoProvider(cli *client.Client) (container.InfoProvider, error) {
|
||||||
|
sh, err := cli.NNSContractAddress(client.NNSContainerContractName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("resolve container contract hash: %w", err)
|
||||||
|
}
|
||||||
|
cc, err := morphcontainer.NewFromMorph(cli, sh, 0, morphcontainer.TryNotary())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create morph container client: %w", err)
|
||||||
|
}
|
||||||
|
return container.NewInfoProvider(func() (container.Source, error) {
|
||||||
|
return morphcontainer.AsContainerSource(cc), nil
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
func initUpgradeCommand() {
|
func initUpgradeCommand() {
|
||||||
flags := UpgradeCmd.Flags()
|
flags := UpgradeCmd.Flags()
|
||||||
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
|
|
||||||
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
|
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -25,15 +27,15 @@ const (
|
||||||
upgradeTimeout = 1 * time.Second
|
upgradeTimeout = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
|
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error{
|
||||||
2: upgradeFromV2ToV3,
|
2: upgradeFromV2ToV3,
|
||||||
3: func(_ context.Context, _ *bbolt.DB, log func(a ...any)) error {
|
3: func(_ context.Context, _ *bbolt.DB, _ container.InfoProvider, log func(a ...any)) error {
|
||||||
log("metabase already upgraded")
|
log("metabase already upgraded")
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
|
func Upgrade(ctx context.Context, path string, compact bool, cs container.InfoProvider, log func(a ...any)) error {
|
||||||
if _, err := os.Stat(path); err != nil {
|
if _, err := os.Stat(path); err != nil {
|
||||||
return fmt.Errorf("check metabase existence: %w", err)
|
return fmt.Errorf("check metabase existence: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -61,7 +63,7 @@ func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any))
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return fmt.Errorf("set upgrade key %w", err)
|
return fmt.Errorf("set upgrade key %w", err)
|
||||||
}
|
}
|
||||||
if err := updater(ctx, db, log); err != nil {
|
if err := updater(ctx, db, cs, log); err != nil {
|
||||||
return fmt.Errorf("update metabase schema: %w", err)
|
return fmt.Errorf("update metabase schema: %w", err)
|
||||||
}
|
}
|
||||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
@ -113,11 +115,11 @@ func compactDB(db *bbolt.DB) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error {
|
||||||
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
|
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := dropUserAttributes(ctx, db, log); err != nil {
|
if err := dropUserAttributes(ctx, db, cs, log); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
|
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
|
||||||
|
@ -323,10 +325,81 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProvider, log func(a ...any)) error {
|
||||||
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) {
|
log("deleting user attribute buckets...")
|
||||||
log(append([]any{"user attributes:"}, a...)...)
|
const batch = 1000
|
||||||
})
|
prefix := []byte{userAttributePrefix}
|
||||||
|
last := prefix
|
||||||
|
var count uint64
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
var keys [][]byte
|
||||||
|
if err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
c := tx.Cursor()
|
||||||
|
for k, _ := c.Seek(last); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
|
||||||
|
if bytes.Equal(last, k) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
keys = append(keys, bytes.Clone(k))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
log("deleting user attribute buckets completed with an error:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(keys) == 0 {
|
||||||
|
log("deleting user attribute buckets completed successfully, deleted", count, "buckets")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
last = keys[len(keys)-1]
|
||||||
|
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
for _, k := range keysToDrop {
|
||||||
|
if err := tx.DeleteBucket(k); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
log("deleting buckets completed with an error:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
count += uint64(len(keysToDrop))
|
||||||
|
log("deleted", count, "buckets")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) {
|
||||||
|
var keysToDrop [][]byte
|
||||||
|
for _, key := range keys {
|
||||||
|
attr, ok := attributeFromAttributeBucket(key)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
||||||
|
}
|
||||||
|
if !IsAtrributeIndexed(attr) {
|
||||||
|
keysToDrop = append(keysToDrop, key)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
contID, ok := cidFromAttributeBucket(key)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
||||||
|
}
|
||||||
|
info, err := cs.Info(contID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if info.Removed || !info.Indexed {
|
||||||
|
keysToDrop = append(keysToDrop, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keysToDrop, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -35,13 +36,19 @@ func TestUpgradeV2ToV3(t *testing.T) {
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
|
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
require.NoError(t, Upgrade(context.Background(), path, true, t.Log))
|
require.NoError(t, Upgrade(context.Background(), path, true, &testContainerInfoProvider{}, t.Log))
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, db.Init())
|
require.NoError(t, db.Init())
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testContainerInfoProvider struct{}
|
||||||
|
|
||||||
|
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
|
||||||
|
return container.Info{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func createTempCopy(t *testing.T, path string) string {
|
func createTempCopy(t *testing.T, path string) string {
|
||||||
src, err := os.Open(path)
|
src, err := os.Open(path)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -95,7 +102,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
i := i
|
i := i
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
_, err := db.Put(ctx, PutPrm{
|
_, err := db.Put(ctx, PutPrm{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
@ -118,8 +125,8 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
child.SetParent(parent)
|
child.SetParent(parent)
|
||||||
idParent, _ := parent.ID()
|
idParent, _ := parent.ID()
|
||||||
child.SetParentID(idParent)
|
child.SetParentID(idParent)
|
||||||
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
testutil.AddAttribute(child, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
testutil.AddAttribute(parent, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
_, err := db.Put(ctx, PutPrm{
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
@ -138,7 +145,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
i := i
|
i := i
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
_, err := db.Put(ctx, PutPrm{
|
_, err := db.Put(ctx, PutPrm{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
@ -160,7 +167,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
i := i
|
i := i
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
_, err := db.Put(ctx, PutPrm{
|
_, err := db.Put(ctx, PutPrm{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
@ -190,7 +197,7 @@ func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
i := i
|
i := i
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
testutil.AddAttribute(obj, objectSDK.AttributeFilePath, strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
_, err := db.Put(ctx, PutPrm{
|
_, err := db.Put(ctx, PutPrm{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
|
|
@ -176,6 +176,21 @@ func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
||||||
return append(key[:bucketKeySize], attributeKey...)
|
return append(key[:bucketKeySize], attributeKey...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cidFromAttributeBucket(bucketName []byte) (cid.ID, bool) {
|
||||||
|
if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix {
|
||||||
|
return cid.ID{}, false
|
||||||
|
}
|
||||||
|
var result cid.ID
|
||||||
|
return result, result.Decode(bucketName[1:bucketKeySize]) == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func attributeFromAttributeBucket(bucketName []byte) (string, bool) {
|
||||||
|
if len(bucketName) < bucketKeySize || bucketName[0] != userAttributePrefix {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
return string(bucketName[bucketKeySize:]), true
|
||||||
|
}
|
||||||
|
|
||||||
// rootBucketName returns <CID>_root.
|
// rootBucketName returns <CID>_root.
|
||||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, rootPrefix, key)
|
return bucketName(cnr, rootPrefix, key)
|
||||||
|
|
Loading…
Reference in a new issue