[#1334] metabase: Add upgrade from v2 to v3
All checks were successful
DCO action / DCO (pull_request) Successful in 4m35s
Tests and linters / Run gofumpt (pull_request) Successful in 6m7s
Vulncheck / Vulncheck (pull_request) Successful in 6m26s
Tests and linters / Tests (1.23) (pull_request) Successful in 6m59s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m9s
Tests and linters / Staticcheck (pull_request) Successful in 7m9s
Pre-commit hooks / Pre-commit (pull_request) Successful in 7m32s
Tests and linters / Lint (pull_request) Successful in 7m40s
Tests and linters / Tests with -race (pull_request) Successful in 7m43s
Tests and linters / gopls check (pull_request) Successful in 7m42s
Build / Build Components (1.23) (pull_request) Successful in 7m37s
Build / Build Components (1.22) (pull_request) Successful in 7m44s
All checks were successful
DCO action / DCO (pull_request) Successful in 4m35s
Tests and linters / Run gofumpt (pull_request) Successful in 6m7s
Vulncheck / Vulncheck (pull_request) Successful in 6m26s
Tests and linters / Tests (1.23) (pull_request) Successful in 6m59s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m9s
Tests and linters / Staticcheck (pull_request) Successful in 7m9s
Pre-commit hooks / Pre-commit (pull_request) Successful in 7m32s
Tests and linters / Lint (pull_request) Successful in 7m40s
Tests and linters / Tests with -race (pull_request) Successful in 7m43s
Tests and linters / gopls check (pull_request) Successful in 7m42s
Build / Build Components (1.23) (pull_request) Successful in 7m37s
Build / Build Components (1.22) (pull_request) Successful in 7m44s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
03976c6ed5
commit
1bd1a6c021
7 changed files with 654 additions and 3 deletions
15
cmd/frostfs-adm/internal/modules/metabase/root.go
Normal file
15
cmd/frostfs-adm/internal/modules/metabase/root.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package metabase
|
||||||
|
|
||||||
|
import "github.com/spf13/cobra"
|
||||||
|
|
||||||
|
// RootCmd is a root command of config section.
|
||||||
|
var RootCmd = &cobra.Command{
|
||||||
|
Use: "metabase",
|
||||||
|
Short: "Section for metabase commands",
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RootCmd.AddCommand(UpgradeCmd)
|
||||||
|
|
||||||
|
initUpgradeCommand()
|
||||||
|
}
|
40
cmd/frostfs-adm/internal/modules/metabase/upgrade.go
Normal file
40
cmd/frostfs-adm/internal/modules/metabase/upgrade.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
package metabase
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
pathFlag = "path"
|
||||||
|
noCompactFlag = "no-compact"
|
||||||
|
)
|
||||||
|
|
||||||
|
var path string
|
||||||
|
|
||||||
|
var UpgradeCmd = &cobra.Command{
|
||||||
|
Use: "upgrade",
|
||||||
|
Short: "Upgrade metabase to latest version",
|
||||||
|
RunE: upgrade,
|
||||||
|
}
|
||||||
|
|
||||||
|
func upgrade(cmd *cobra.Command, _ []string) error {
|
||||||
|
path, _ := cmd.Flags().GetString(pathFlag)
|
||||||
|
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
|
||||||
|
if err := meta.Upgrade(cmd.Context(), path, !noCompact, func(a ...any) {
|
||||||
|
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":"}, a...)...)
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to upgrade metabase: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func initUpgradeCommand() {
|
||||||
|
flags := UpgradeCmd.Flags()
|
||||||
|
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
|
||||||
|
_ = UpgradeCmd.MarkFlagRequired(pathFlag)
|
||||||
|
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
||||||
|
@ -41,6 +42,7 @@ func init() {
|
||||||
rootCmd.AddCommand(config.RootCmd)
|
rootCmd.AddCommand(config.RootCmd)
|
||||||
rootCmd.AddCommand(morph.RootCmd)
|
rootCmd.AddCommand(morph.RootCmd)
|
||||||
rootCmd.AddCommand(storagecfg.RootCmd)
|
rootCmd.AddCommand(storagecfg.RootCmd)
|
||||||
|
rootCmd.AddCommand(metabase.RootCmd)
|
||||||
|
|
||||||
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
|
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
|
||||||
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))
|
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))
|
||||||
|
|
361
pkg/local_object_storage/metabase/upgrade.go
Normal file
361
pkg/local_object_storage/metabase/upgrade.go
Normal file
|
@ -0,0 +1,361 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
upgradeLogFrequency = 50_000
|
||||||
|
upgradeWorkersCount = 1_000
|
||||||
|
compactMaxTxSize = 256 << 20
|
||||||
|
)
|
||||||
|
|
||||||
|
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
|
||||||
|
2: upgradeFromV2ToV3,
|
||||||
|
}
|
||||||
|
|
||||||
|
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
|
||||||
|
if _, err := os.Stat(path); err != nil {
|
||||||
|
return fmt.Errorf("check metabase existence: %w", err)
|
||||||
|
}
|
||||||
|
db, err := bbolt.Open(path, os.ModePerm, bbolt.DefaultOptions)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open metabase: %w", err)
|
||||||
|
}
|
||||||
|
var version uint64
|
||||||
|
if err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
var e error
|
||||||
|
version, e = currentVersion(tx)
|
||||||
|
return e
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
updater, found := updates[version]
|
||||||
|
if !found {
|
||||||
|
return fmt.Errorf("unsupported version %d: no update available", version)
|
||||||
|
}
|
||||||
|
if err := updater(ctx, db, log); err != nil {
|
||||||
|
return fmt.Errorf("update metabase schema: %w", err)
|
||||||
|
}
|
||||||
|
if compact {
|
||||||
|
log("compacting metabase...")
|
||||||
|
err := compactDB(db)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("compact metabase: %w", err)
|
||||||
|
}
|
||||||
|
log("metabase compacted")
|
||||||
|
}
|
||||||
|
return db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func compactDB(db *bbolt.DB) error {
|
||||||
|
sourcePath := db.Path()
|
||||||
|
tmpFileName := sourcePath + "." + time.Now().Format(time.RFC3339)
|
||||||
|
f, err := os.Stat(sourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{
|
||||||
|
Timeout: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
||||||
|
}
|
||||||
|
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
||||||
|
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
if err := dst.Close(); err != nil {
|
||||||
|
return fmt.Errorf("close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
if err := db.Close(); err != nil {
|
||||||
|
return fmt.Errorf("close source metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
if err := os.Rename(tmpFileName, sourcePath); err != nil {
|
||||||
|
return fmt.Errorf("replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||||
|
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dropUserAttributes(ctx, db, log); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dropPayloadChecksumIndex(ctx, db, log); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
return updateVersion(tx, version)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectIDToExpEpoch struct {
|
||||||
|
containerID cid.ID
|
||||||
|
objectID oid.ID
|
||||||
|
expirationEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||||
|
log("filling expiration epoch buckets...")
|
||||||
|
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
_, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName)
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
objects := make(chan objectIDToExpEpoch)
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
eg.Go(func() error {
|
||||||
|
return selectObjectsWithExpirationEpoch(ctx, db, objects)
|
||||||
|
})
|
||||||
|
var count atomic.Uint64
|
||||||
|
for i := 0; i < upgradeWorkersCount; i++ {
|
||||||
|
eg.Go(func() error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case obj, ok := <-objects:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
if err := putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: expEpochToObjectBucketName,
|
||||||
|
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||||
|
val: zeroValue,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
val := make([]byte, epochSize)
|
||||||
|
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||||
|
return putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
|
||||||
|
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if c := count.Add(1); c%upgradeLogFrequency == 0 {
|
||||||
|
log("expiration epoch filled for", c, "objects...")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err := eg.Wait()
|
||||||
|
if err != nil {
|
||||||
|
log("expiration epoch buckets completed completed with error:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log("filling expiration epoch buckets completed successfully, total", count.Load(), "objects")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
|
||||||
|
defer close(objects)
|
||||||
|
|
||||||
|
const batchSize = 1000
|
||||||
|
it := &objectsWithExpirationEpochBatchIterator{
|
||||||
|
lastAttributeKey: usrAttrPrefix,
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, item := range it.items {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case objects <- item:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(it.items) < batchSize {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
it.items = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
usrAttrPrefix = []byte{userAttributePrefix}
|
||||||
|
errBatchSizeLimit = errors.New("batch size limit")
|
||||||
|
)
|
||||||
|
|
||||||
|
type objectsWithExpirationEpochBatchIterator struct {
|
||||||
|
lastAttributeKey []byte
|
||||||
|
lastAttributeValue []byte
|
||||||
|
lastAttrKeyValueItem []byte
|
||||||
|
items []objectIDToExpEpoch
|
||||||
|
}
|
||||||
|
|
||||||
|
// - {prefix}{containerID}{attributeKey} <- bucket
|
||||||
|
// -- {attributeValue} <- bucket, expirationEpoch
|
||||||
|
// --- {objectID}: zeroValue <- record
|
||||||
|
|
||||||
|
func getNextObjectsWithExpirationEpochBatch(ctx context.Context, db *bbolt.DB, it *objectsWithExpirationEpochBatchIterator, batchSize int) error {
|
||||||
|
seekAttrValue := it.lastAttributeValue
|
||||||
|
seekAttrKVItem := it.lastAttrKeyValueItem
|
||||||
|
err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
attrKeyC := tx.Cursor()
|
||||||
|
for attrKey, _ := attrKeyC.Seek(it.lastAttributeKey); attrKey != nil && bytes.HasPrefix(attrKey, usrAttrPrefix); attrKey, _ = attrKeyC.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if len(attrKey) <= 1+cidSize {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
attributeKey := string(attrKey[1+cidSize:])
|
||||||
|
if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var containerID cid.ID
|
||||||
|
if err := containerID.Decode(attrKey[1 : 1+cidSize]); err != nil {
|
||||||
|
return fmt.Errorf("decode container id from user attribute bucket: %w", err)
|
||||||
|
}
|
||||||
|
if err := iterateExpirationAttributeKeyBucket(ctx, tx.Bucket(attrKey), it, batchSize, containerID, attrKey, seekAttrValue, seekAttrKVItem); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
seekAttrValue = nil
|
||||||
|
seekAttrKVItem = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil && !errors.Is(err, errBatchSizeLimit) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, it *objectsWithExpirationEpochBatchIterator, batchSize int, containerID cid.ID, attrKey, seekAttrValue, seekAttrKVItem []byte) error {
|
||||||
|
attrValueC := b.Cursor()
|
||||||
|
for attrValue, v := attrValueC.Seek(seekAttrValue); attrValue != nil; attrValue, v = attrValueC.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if v != nil {
|
||||||
|
continue // need to iterate over buckets, not records
|
||||||
|
}
|
||||||
|
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
||||||
|
}
|
||||||
|
expirationEpochBucket := b.Bucket(attrValue)
|
||||||
|
attrKeyValueC := expirationEpochBucket.Cursor()
|
||||||
|
for attrKeyValueItem, v := attrKeyValueC.Seek(seekAttrKVItem); attrKeyValueItem != nil; attrKeyValueItem, v = attrKeyValueC.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if v == nil {
|
||||||
|
continue // need to iterate over records, not buckets
|
||||||
|
}
|
||||||
|
if bytes.Equal(it.lastAttributeKey, attrKey) && bytes.Equal(it.lastAttributeValue, attrValue) && bytes.Equal(it.lastAttrKeyValueItem, attrKeyValueItem) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var objectID oid.ID
|
||||||
|
if err := objectID.Decode(attrKeyValueItem); err != nil {
|
||||||
|
return fmt.Errorf("decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err)
|
||||||
|
}
|
||||||
|
it.lastAttributeKey = bytes.Clone(attrKey)
|
||||||
|
it.lastAttributeValue = bytes.Clone(attrValue)
|
||||||
|
it.lastAttrKeyValueItem = bytes.Clone(attrKeyValueItem)
|
||||||
|
it.items = append(it.items, objectIDToExpEpoch{
|
||||||
|
containerID: containerID,
|
||||||
|
objectID: objectID,
|
||||||
|
expirationEpoch: expirationEpoch,
|
||||||
|
})
|
||||||
|
if len(it.items) == batchSize {
|
||||||
|
return errBatchSizeLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
seekAttrKVItem = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||||
|
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) {
|
||||||
|
log(append([]any{"user attributes:"}, a...)...)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||||
|
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
|
||||||
|
log(append([]any{"owner ID index:"}, a...)...)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||||
|
return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}, func(a ...any) {
|
||||||
|
log(append([]any{"payload checksum:"}, a...)...)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte, log func(a ...any)) error {
|
||||||
|
log("deleting buckets...")
|
||||||
|
const batch = 1000
|
||||||
|
var count uint64
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
var keys [][]byte
|
||||||
|
if err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
c := tx.Cursor()
|
||||||
|
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
|
||||||
|
keys = append(keys, bytes.Clone(k))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
log("deleting buckets completed with an error:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(keys) == 0 {
|
||||||
|
log("deleting buckets completed successfully, deleted", count, "buckets")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
for _, k := range keys {
|
||||||
|
if err := tx.DeleteBucket(k); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
log("deleting buckets completed with an error:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if count += uint64(len(keys)); count%upgradeLogFrequency == 0 {
|
||||||
|
log("deleted", count, "buckets")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
215
pkg/local_object_storage/metabase/upgrade_test.go
Normal file
215
pkg/local_object_storage/metabase/upgrade_test.go
Normal file
|
@ -0,0 +1,215 @@
|
||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const upgradeFilePath = "/path/to/metabase.v2"
|
||||||
|
|
||||||
|
func TestUpgradeV2ToV3(t *testing.T) {
|
||||||
|
path := createTempCopy(t, upgradeFilePath)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.Remove(path))
|
||||||
|
}()
|
||||||
|
db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)))
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
require.NoError(t, Upgrade(context.Background(), path, true, t.Log))
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
require.NoError(t, db.Init())
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTempCopy(t *testing.T, path string) string {
|
||||||
|
src, err := os.Open(path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339)
|
||||||
|
dest, err := os.Create(tmpPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = io.Copy(dest, src)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, src.Close())
|
||||||
|
require.NoError(t, dest.Close())
|
||||||
|
|
||||||
|
return tmpPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
|
t.Skip("for generating db")
|
||||||
|
const (
|
||||||
|
containersCount = 10_000
|
||||||
|
simpleObjectsCount = 500_000
|
||||||
|
complexObjectsCount = 500_000 // x2
|
||||||
|
deletedByGCMarksCount = 100_000
|
||||||
|
deletedByTombstoneCount = 100_000 // x2
|
||||||
|
lockedCount = 100_000 // x2
|
||||||
|
|
||||||
|
allocSize = 128 << 20
|
||||||
|
generateWorkersCount = 1_000
|
||||||
|
minEpoch = 1_000
|
||||||
|
maxFilename = 1_000
|
||||||
|
maxStorageID = 10_000
|
||||||
|
)
|
||||||
|
|
||||||
|
db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: minEpoch}), WithLogger(test.NewLogger(t)))
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
db.boltDB.AllocSize = allocSize
|
||||||
|
db.boltDB.NoSync = true
|
||||||
|
require.NoError(t, db.Init())
|
||||||
|
containers := make([]cid.ID, containersCount)
|
||||||
|
for i := range containers {
|
||||||
|
containers[i] = cidtest.ID()
|
||||||
|
}
|
||||||
|
oc, err := db.ObjectCounters()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, oc.IsZero())
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(generateWorkersCount)
|
||||||
|
// simple objects
|
||||||
|
for i := 0; i < simpleObjectsCount; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(generateWorkersCount)
|
||||||
|
// complex objects
|
||||||
|
for i := 0; i < complexObjectsCount; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
parent := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
child := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
child.SetParent(parent)
|
||||||
|
idParent, _ := parent.ID()
|
||||||
|
child.SetParentID(idParent)
|
||||||
|
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
|
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
|
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
|
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: child,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("complex objects generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(generateWorkersCount)
|
||||||
|
// simple objects deleted by gc marks
|
||||||
|
for i := 0; i < deletedByGCMarksCount; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = db.Inhume(ctx, InhumePrm{
|
||||||
|
target: []oid.Address{object.AddressOf(obj)},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects deleted by gc marks generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(10000)
|
||||||
|
// simple objects deleted by tombstones
|
||||||
|
for i := 0; i < deletedByTombstoneCount; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
})
|
||||||
|
tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
tomb.SetType(objectSDK.TypeTombstone)
|
||||||
|
_, err = db.Put(ctx, PutPrm{
|
||||||
|
obj: tomb,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
tombAddr := object.AddressOf(tomb)
|
||||||
|
_, err = db.Inhume(ctx, InhumePrm{
|
||||||
|
target: []oid.Address{object.AddressOf(obj)},
|
||||||
|
tomb: &tombAddr,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects deleted by tombstones generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(generateWorkersCount)
|
||||||
|
// simple objects locked by locks
|
||||||
|
for i := 0; i < lockedCount; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
})
|
||||||
|
lock := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
lock.SetType(objectSDK.TypeLock)
|
||||||
|
testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||||
|
_, err = db.Put(ctx, PutPrm{
|
||||||
|
obj: lock,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects locked by locks generated")
|
||||||
|
require.NoError(t, db.boltDB.Sync())
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}
|
|
@ -94,11 +94,13 @@ const (
|
||||||
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
||||||
// Key: owner ID
|
// Key: owner ID
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
_
|
// removed in version 3
|
||||||
|
ownerPrefix
|
||||||
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||||
// Key: attribute value
|
// Key: attribute value
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
_
|
// removed in version 3
|
||||||
|
userAttributePrefix
|
||||||
|
|
||||||
// ====================
|
// ====================
|
||||||
// List index buckets.
|
// List index buckets.
|
||||||
|
@ -107,7 +109,8 @@ const (
|
||||||
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
||||||
// Key: payload hash
|
// Key: payload hash
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
_
|
// removed in version 3
|
||||||
|
payloadHashPrefix
|
||||||
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
||||||
// Key: parent ID
|
// Key: parent ID
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
@ -18,6 +19,8 @@ var versionKey = []byte("version")
|
||||||
// the current code version.
|
// the current code version.
|
||||||
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
|
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
|
||||||
|
|
||||||
|
var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket")
|
||||||
|
|
||||||
func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
||||||
var knownVersion bool
|
var knownVersion bool
|
||||||
|
|
||||||
|
@ -59,3 +62,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
||||||
}
|
}
|
||||||
return b.Put(versionKey, data)
|
return b.Put(versionKey, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func currentVersion(tx *bbolt.Tx) (uint64, error) {
|
||||||
|
b := tx.Bucket(shardInfoBucket)
|
||||||
|
if b == nil {
|
||||||
|
return 0, errVersionUndefinedNoInfoBucket
|
||||||
|
}
|
||||||
|
data := b.Get(versionKey)
|
||||||
|
if len(data) != 8 {
|
||||||
|
return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data))
|
||||||
|
}
|
||||||
|
return binary.LittleEndian.Uint64(data), nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue