metabase: Add upgrade from v2 to v3 #1334
15
cmd/frostfs-adm/internal/modules/metabase/root.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package metabase
|
||||
|
||||
import "github.com/spf13/cobra"
|
||||
|
||||
// RootCmd is a root command of config section.
|
||||
var RootCmd = &cobra.Command{
|
||||
Use: "metabase",
|
||||
Short: "Section for metabase commands",
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(UpgradeCmd)
|
||||
|
||||
initUpgradeCommand()
|
||||
}
|
86
cmd/frostfs-adm/internal/modules/metabase/upgrade.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package metabase
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
pathFlag = "path"
|
||||
noCompactFlag = "no-compact"
|
||||
)
|
||||
|
||||
var errNoPathsFound = errors.New("no metabase paths found")
|
||||
|
||||
var path string
|
||||
|
||||
var UpgradeCmd = &cobra.Command{
|
||||
Use: "upgrade",
|
||||
Short: "Upgrade metabase to latest version",
|
||||
RunE: upgrade,
|
||||
}
|
||||
|
||||
|
||||
func upgrade(cmd *cobra.Command, _ []string) error {
|
||||
configFile, err := cmd.Flags().GetString(commonflags.ConfigFlag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
configDir, err := cmd.Flags().GetString(commonflags.ConfigDirFlag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
|
||||
var paths []string
|
||||
if path != "" {
|
||||
paths = append(paths, path)
|
||||
}
|
||||
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
||||
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
|
||||
paths = append(paths, sc.Metabase().Path())
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to get metabase paths: %w", err)
|
||||
}
|
||||
if len(paths) == 0 {
|
||||
return errNoPathsFound
|
||||
}
|
||||
cmd.Println("found", len(paths), "metabases:")
|
||||
for i, path := range paths {
|
||||
cmd.Println(i+1, ":", path)
|
||||
}
|
||||
result := make(map[string]bool)
|
||||
for _, path := range paths {
|
||||
cmd.Println("upgrading metabase", path, "...")
|
||||
if err := meta.Upgrade(cmd.Context(), path, !noCompact, func(a ...any) {
|
||||
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...)
|
||||
}); err != nil {
|
||||
result[path] = false
|
||||
cmd.Println("error: failed to upgrade metabase", path, ":", err)
|
||||
} else {
|
||||
result[path] = true
|
||||
cmd.Println("metabase", path, "upgraded successfully")
|
||||
}
|
||||
}
|
||||
for mb, ok := range result {
|
||||
if ok {
|
||||
cmd.Println(mb, ": success")
|
||||
} else {
|
||||
cmd.Println(mb, ": failed")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func initUpgradeCommand() {
|
||||
flags := UpgradeCmd.Flags()
|
||||
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
|
||||
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
|
||||
}
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
||||
|
@ -41,6 +42,7 @@ func init() {
|
|||
rootCmd.AddCommand(config.RootCmd)
|
||||
rootCmd.AddCommand(morph.RootCmd)
|
||||
rootCmd.AddCommand(storagecfg.RootCmd)
|
||||
rootCmd.AddCommand(metabase.RootCmd)
|
||||
|
||||
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
|
||||
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))
|
||||
|
|
376
pkg/local_object_storage/metabase/upgrade.go
Normal file
|
@ -0,0 +1,376 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
upgradeLogFrequency = 50_000
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`logFrequency`?
dstepanov-yadro
commented
ok, done ok, done
|
||||
upgradeWorkersCount = 1_000
|
||||
compactMaxTxSize = 256 << 20
|
||||
upgradeTimeout = 1 * time.Second
|
||||
fyrchik
commented
We use 100ms in all other code, but no big deal. We use 100ms in all other code, but no big deal.
|
||||
)
|
||||
|
||||
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
|
||||
2: upgradeFromV2ToV3,
|
||||
}
|
||||
|
||||
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It will be It will be `failet to .. : failed to .. : failed to ..`
How about `open metabase: %w`?
dstepanov-yadro
commented
ok, fixed ok, fixed
|
||||
return fmt.Errorf("check metabase existence: %w", err)
|
||||
}
|
||||
opts := bbolt.DefaultOptions
|
||||
opts.Timeout = upgradeTimeout
|
||||
db, err := bbolt.Open(path, os.ModePerm, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open metabase: %w", err)
|
||||
}
|
||||
var version uint64
|
||||
if err := db.View(func(tx *bbolt.Tx) error {
|
||||
var e error
|
||||
version, e = currentVersion(tx)
|
||||
return e
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
updater, found := updates[version]
|
||||
acid-ant marked this conversation as resolved
Outdated
acid-ant
commented
Why we need to fail here, maybe just print debug message that update is not necessary? Why we need to fail here, maybe just print debug message that update is not necessary?
dstepanov-yadro
commented
Because this means that database schema is not supported by code. For example, metabase v1 will not work with v0.42 frostfs-node version. Because this means that database schema is not supported by code. For example, metabase v1 will not work with v0.42 frostfs-node version.
|
||||
if !found {
|
||||
return fmt.Errorf("unsupported version %d: no update available", version)
|
||||
}
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
return b.Put(upgradeKey, zeroValue)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("set upgrade key %w", err)
|
||||
}
|
||||
if err := updater(ctx, db, log); err != nil {
|
||||
return fmt.Errorf("update metabase schema: %w", err)
|
||||
}
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
return b.Delete(upgradeKey)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("delete upgrade key %w", err)
|
||||
}
|
||||
if compact {
|
||||
log("compacting metabase...")
|
||||
err := compactDB(db)
|
||||
acid-ant marked this conversation as resolved
Outdated
acid-ant
commented
Looks like we need to remove all files which matches pattern Looks like we need to remove all files which matches pattern `db.info.Path + "." + "*"`, not only `tmpFileName`.
Or how about to do cleanup before starting update?
dstepanov-yadro
commented
I suppose to not to delete files by pattern to not to delete something needed (manual backup for example) I suppose to not to delete files by pattern to not to delete something needed (manual backup for example)
acid-ant
commented
That will lead to increasing garbage inside That will lead to increasing garbage inside `db.info.Path` when node killed.
|
||||
if err != nil {
|
||||
return fmt.Errorf("compact metabase: %w", err)
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Magic constant. Magic constant.
dstepanov-yadro
commented
fixed fixed
|
||||
log("metabase compacted")
|
||||
}
|
||||
return db.Close()
|
||||
}
|
||||
|
||||
func compactDB(db *bbolt.DB) error {
|
||||
sourcePath := db.Path()
|
||||
tmpFileName := sourcePath + "." + time.Now().Format(time.RFC3339)
|
||||
f, err := os.Stat(sourcePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
||||
}
|
||||
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
||||
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||
}
|
||||
if err := dst.Close(); err != nil {
|
||||
return fmt.Errorf("close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||
}
|
||||
if err := db.Close(); err != nil {
|
||||
return fmt.Errorf("close source metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||
}
|
||||
if err := os.Rename(tmpFileName, sourcePath); err != nil {
|
||||
return fmt.Errorf("replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropUserAttributes(ctx, db, log); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropPayloadChecksumIndex(ctx, db, log); err != nil {
|
||||
return err
|
||||
}
|
||||
return db.Update(func(tx *bbolt.Tx) error {
|
||||
return updateVersion(tx, version)
|
||||
})
|
||||
}
|
||||
|
||||
type objectIDToExpEpoch struct {
|
||||
containerID cid.ID
|
||||
objectID oid.ID
|
||||
expirationEpoch uint64
|
||||
}
|
||||
|
||||
func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
log("filling expiration epoch buckets...")
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why parallelize but use Why parallelize but use `Update` instead of `Batch`?
dstepanov-yadro
commented
Outdated Outdated
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName)
|
||||
return err
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
objects := make(chan objectIDToExpEpoch)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do we always Do we always `put` by some new key?
Otherwise we might not behave idempotently, this is a must for upgrade.
dstepanov-yadro
commented
For same object For same object `expirationEpochKey` will produce the same key. Why this leads to idempotence lost?
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
return selectObjectsWithExpirationEpoch(ctx, db, objects)
|
||||
})
|
||||
var count atomic.Uint64
|
||||
for i := 0; i < upgradeWorkersCount; i++ {
|
||||
eg.Go(func() error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case obj, ok := <-objects:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||
if err := putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: expEpochToObjectBucketName,
|
||||
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||
val: zeroValue,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
val := make([]byte, epochSize)
|
||||
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Can log have signature similar to Can log have signature similar to `fmt.Println` to avoid `fmt.Sprintf` on callsites?
dstepanov-yadro
commented
done done
|
||||
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||
val: val,
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if c := count.Add(1); c%upgradeLogFrequency == 0 {
|
||||
log("expiration epoch filled for", c, "objects...")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
log("expiration epoch buckets completed completed with error:", err)
|
||||
return err
|
||||
}
|
||||
log("filling expiration epoch buckets completed successfully, total", count.Load(), "objects")
|
||||
return nil
|
||||
}
|
||||
|
||||
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
|
||||
defer close(objects)
|
||||
|
||||
const batchSize = 1000
|
||||
it := &objectsWithExpirationEpochBatchIterator{
|
||||
lastAttributeKey: usrAttrPrefix,
|
||||
}
|
||||
for {
|
||||
if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, item := range it.items {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case objects <- item:
|
||||
}
|
||||
}
|
||||
|
||||
if len(it.items) < batchSize {
|
||||
return nil
|
||||
}
|
||||
it.items = nil
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
usrAttrPrefix = []byte{userAttributePrefix}
|
||||
errBatchSizeLimit = errors.New("batch size limit")
|
||||
)
|
||||
|
||||
type objectsWithExpirationEpochBatchIterator struct {
|
||||
lastAttributeKey []byte
|
||||
lastAttributeValue []byte
|
||||
lastAttrKeyValueItem []byte
|
||||
items []objectIDToExpEpoch
|
||||
}
|
||||
|
||||
// - {prefix}{containerID}{attributeKey} <- bucket
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
If we need only 2 attributes, we can have 2 separate If we need only 2 attributes, we can have 2 separate `Seek` loops.
It seems we currently are iterating over all attributes.
dstepanov-yadro
commented
How? User attributes buckets have names with pattern How? User attributes buckets have names with pattern `{prefix}{cid}{attribute}`.
fyrchik
commented
Oh, I see the problem -- we need to iterate specific attribute, but in all containers. Still possible with seeks (remember current seed and seek for each container), but your implementation is more readable. Oh, I see the problem -- we need to iterate specific attribute, but in all containers.
Still possible with seeks (remember current seed and seek for each container), but your implementation is more readable.
|
||||
// -- {attributeValue} <- bucket, expirationEpoch
|
||||
// --- {objectID}: zeroValue <- record
|
||||
|
||||
func getNextObjectsWithExpirationEpochBatch(ctx context.Context, db *bbolt.DB, it *objectsWithExpirationEpochBatchIterator, batchSize int) error {
|
||||
seekAttrValue := it.lastAttributeValue
|
||||
seekAttrKVItem := it.lastAttrKeyValueItem
|
||||
err := db.View(func(tx *bbolt.Tx) error {
|
||||
attrKeyC := tx.Cursor()
|
||||
for attrKey, _ := attrKeyC.Seek(it.lastAttributeKey); attrKey != nil && bytes.HasPrefix(attrKey, usrAttrPrefix); attrKey, _ = attrKeyC.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
if len(attrKey) <= 1+cidSize {
|
||||
continue
|
||||
}
|
||||
attributeKey := string(attrKey[1+cidSize:])
|
||||
if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch {
|
||||
continue
|
||||
}
|
||||
var containerID cid.ID
|
||||
if err := containerID.Decode(attrKey[1 : 1+cidSize]); err != nil {
|
||||
return fmt.Errorf("decode container id from user attribute bucket: %w", err)
|
||||
}
|
||||
if err := iterateExpirationAttributeKeyBucket(ctx, tx.Bucket(attrKey), it, batchSize, containerID, attrKey, seekAttrValue, seekAttrKVItem); err != nil {
|
||||
return err
|
||||
}
|
||||
seekAttrValue = nil
|
||||
seekAttrKVItem = nil
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, errBatchSizeLimit) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, it *objectsWithExpirationEpochBatchIterator, batchSize int, containerID cid.ID, attrKey, seekAttrValue, seekAttrKVItem []byte) error {
|
||||
attrValueC := b.Cursor()
|
||||
for attrValue, v := attrValueC.Seek(seekAttrValue); attrValue != nil; attrValue, v = attrValueC.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
if v != nil {
|
||||
continue // need to iterate over buckets, not records
|
||||
}
|
||||
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we clone them? They seem to be used only during tx. Why do we clone them? They seem to be used only during tx.
dstepanov-yadro
commented
No,
No, `it` used outside transaction:
```
it := &objectsWithExpirationEpochBatchIterator{
lastAttributeKey: usrAttrPrefix,
}
for {
if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil {
return err
}
```
|
||||
expirationEpochBucket := b.Bucket(attrValue)
|
||||
attrKeyValueC := expirationEpochBucket.Cursor()
|
||||
for attrKeyValueItem, v := attrKeyValueC.Seek(seekAttrKVItem); attrKeyValueItem != nil; attrKeyValueItem, v = attrKeyValueC.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
if v == nil {
|
||||
continue // need to iterate over records, not buckets
|
||||
}
|
||||
if bytes.Equal(it.lastAttributeKey, attrKey) && bytes.Equal(it.lastAttributeValue, attrValue) && bytes.Equal(it.lastAttrKeyValueItem, attrKeyValueItem) {
|
||||
continue
|
||||
}
|
||||
var objectID oid.ID
|
||||
if err := objectID.Decode(attrKeyValueItem); err != nil {
|
||||
return fmt.Errorf("decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err)
|
||||
}
|
||||
it.lastAttributeKey = bytes.Clone(attrKey)
|
||||
it.lastAttributeValue = bytes.Clone(attrValue)
|
||||
it.lastAttrKeyValueItem = bytes.Clone(attrKeyValueItem)
|
||||
it.items = append(it.items, objectIDToExpEpoch{
|
||||
containerID: containerID,
|
||||
objectID: objectID,
|
||||
expirationEpoch: expirationEpoch,
|
||||
})
|
||||
if len(it.items) == batchSize {
|
||||
return errBatchSizeLimit
|
||||
}
|
||||
}
|
||||
seekAttrKVItem = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) {
|
||||
log(append([]any{"user attributes:"}, a...)...)
|
||||
})
|
||||
}
|
||||
|
||||
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
|
||||
log(append([]any{"owner ID index:"}, a...)...)
|
||||
})
|
||||
}
|
||||
|
||||
func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}, func(a ...any) {
|
||||
log(append([]any{"payload checksum:"}, a...)...)
|
||||
})
|
||||
}
|
||||
|
||||
func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte, log func(a ...any)) error {
|
||||
log("deleting buckets...")
|
||||
const batch = 1000
|
||||
var count uint64
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
var keys [][]byte
|
||||
if err := db.View(func(tx *bbolt.Tx) error {
|
||||
c := tx.Cursor()
|
||||
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
|
||||
keys = append(keys, bytes.Clone(k))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log("deleting buckets completed with an error:", err)
|
||||
return err
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
log("deleting buckets completed successfully, deleted", count, "buckets")
|
||||
return nil
|
||||
}
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
for _, k := range keys {
|
||||
if err := tx.DeleteBucket(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log("deleting buckets completed with an error:", err)
|
||||
return err
|
||||
}
|
||||
if count += uint64(len(keys)); count%upgradeLogFrequency == 0 {
|
||||
log("deleted", count, "buckets")
|
||||
}
|
||||
}
|
||||
}
|
215
pkg/local_object_storage/metabase/upgrade_test.go
Normal file
|
@ -0,0 +1,215 @@
|
|||
//go:build integration
|
||||
|
||||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const upgradeFilePath = "/path/to/metabase.v2"
|
||||
|
||||
func TestUpgradeV2ToV3(t *testing.T) {
|
||||
path := createTempCopy(t, upgradeFilePath)
|
||||
defer func() {
|
||||
require.NoError(t, os.Remove(path))
|
||||
}()
|
||||
db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)))
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
|
||||
require.NoError(t, db.Close())
|
||||
require.NoError(t, Upgrade(context.Background(), path, true, t.Log))
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
func createTempCopy(t *testing.T, path string) string {
|
||||
src, err := os.Open(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339)
|
||||
dest, err := os.Create(tmpPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = io.Copy(dest, src)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, src.Close())
|
||||
require.NoError(t, dest.Close())
|
||||
|
||||
return tmpPath
|
||||
}
|
||||
|
||||
func TestGenerateMetabaseFile(t *testing.T) {
|
||||
t.Skip("for generating db")
|
||||
const (
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Could you move these numbers to constants and define here instead of the comment? Could you move these numbers to constants and define here instead of the comment?
It might be useful to easily adjust the test, instead of searching the whole function and thinking whether THIS 100_000 is the one I need.
dstepanov-yadro
commented
done done
|
||||
containersCount = 10_000
|
||||
simpleObjectsCount = 500_000
|
||||
complexObjectsCount = 500_000 // x2
|
||||
deletedByGCMarksCount = 100_000
|
||||
deletedByTombstoneCount = 100_000 // x2
|
||||
lockedCount = 100_000 // x2
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do you have any specific motivation to use sth different from defaults in tests? Do you have any specific motivation to use sth different from defaults in tests?
dstepanov-yadro
commented
no, debugee no, debugee
|
||||
allocSize = 128 << 20
|
||||
generateWorkersCount = 1_000
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Magic constant. Magic constant.
dstepanov-yadro
commented
fixed fixed
|
||||
minEpoch = 1_000
|
||||
maxFilename = 1_000
|
||||
maxStorageID = 10_000
|
||||
)
|
||||
|
||||
db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: minEpoch}), WithLogger(test.NewLogger(t)))
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
db.boltDB.AllocSize = allocSize
|
||||
db.boltDB.NoSync = true
|
||||
require.NoError(t, db.Init())
|
||||
containers := make([]cid.ID, containersCount)
|
||||
for i := range containers {
|
||||
containers[i] = cidtest.ID()
|
||||
}
|
||||
oc, err := db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.True(t, oc.IsZero())
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(generateWorkersCount)
|
||||
// simple objects
|
||||
for i := 0; i < simpleObjectsCount; i++ {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
db.log.Info("simple objects generated")
|
||||
eg, ctx = errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(generateWorkersCount)
|
||||
// complex objects
|
||||
for i := 0; i < complexObjectsCount; i++ {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
parent := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
child := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
child.SetParent(parent)
|
||||
idParent, _ := parent.ID()
|
||||
child.SetParentID(idParent)
|
||||
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: child,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
db.log.Info("complex objects generated")
|
||||
eg, ctx = errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(generateWorkersCount)
|
||||
// simple objects deleted by gc marks
|
||||
for i := 0; i < deletedByGCMarksCount; i++ {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
_, err = db.Inhume(ctx, InhumePrm{
|
||||
target: []oid.Address{object.AddressOf(obj)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
db.log.Info("simple objects deleted by gc marks generated")
|
||||
eg, ctx = errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(10000)
|
||||
// simple objects deleted by tombstones
|
||||
for i := 0; i < deletedByTombstoneCount; i++ {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
})
|
||||
tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
tomb.SetType(objectSDK.TypeTombstone)
|
||||
_, err = db.Put(ctx, PutPrm{
|
||||
obj: tomb,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
tombAddr := object.AddressOf(tomb)
|
||||
_, err = db.Inhume(ctx, InhumePrm{
|
||||
target: []oid.Address{object.AddressOf(obj)},
|
||||
tomb: &tombAddr,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
db.log.Info("simple objects deleted by tombstones generated")
|
||||
eg, ctx = errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(generateWorkersCount)
|
||||
// simple objects locked by locks
|
||||
for i := 0; i < lockedCount; i++ {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Isn't this broken? Isn't this broken? `db.Put` uses v3 logic, because that is what we want in our codebase and the test need to generate v2 db.
dstepanov-yadro
commented
I used this test on I used this test on `support/v0.42` branch.
|
||||
_, err := db.Put(ctx, PutPrm{
|
||||
obj: obj,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
})
|
||||
lock := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||
lock.SetType(objectSDK.TypeLock)
|
||||
testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
|
||||
_, err = db.Put(ctx, PutPrm{
|
||||
obj: lock,
|
||||
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()})
|
||||
require.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
db.log.Info("simple objects locked by locks generated")
|
||||
require.NoError(t, db.boltDB.Sync())
|
||||
require.NoError(t, db.Close())
|
||||
}
|
|
@ -94,11 +94,13 @@ const (
|
|||
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
||||
// Key: owner ID
|
||||
// Value: bucket containing object IDs as keys
|
||||
_
|
||||
// removed in version 3
|
||||
ownerPrefix
|
||||
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||
// Key: attribute value
|
||||
// Value: bucket containing object IDs as keys
|
||||
_
|
||||
// removed in version 3
|
||||
userAttributePrefix
|
||||
|
||||
// ====================
|
||||
// List index buckets.
|
||||
|
@ -107,7 +109,8 @@ const (
|
|||
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
||||
// Key: payload hash
|
||||
// Value: list of object IDs
|
||||
_
|
||||
// removed in version 3
|
||||
payloadHashPrefix
|
||||
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
||||
// Key: parent ID
|
||||
// Value: list of object IDs
|
||||
|
|
|
@ -2,6 +2,7 @@ package meta
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -11,13 +12,20 @@ import (
|
|||
// version contains current metabase version.
|
||||
const version = 3
|
||||
|
||||
var versionKey = []byte("version")
|
||||
var (
|
||||
versionKey = []byte("version")
|
||||
upgradeKey = []byte("upgrade")
|
||||
)
|
||||
|
||||
// ErrOutdatedVersion is returned on initializing
|
||||
// an existing metabase that is not compatible with
|
||||
// the current code version.
|
||||
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
|
||||
|
||||
var ErrIncompletedUpgrade = logicerr.New("metabase upgrade is not completed")
|
||||
|
||||
var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket")
|
||||
|
||||
func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
||||
var knownVersion bool
|
||||
|
||||
|
@ -32,6 +40,10 @@ func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
|||
return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored)
|
||||
}
|
||||
}
|
||||
data = b.Get(upgradeKey)
|
||||
if len(data) > 0 {
|
||||
return ErrIncompletedUpgrade
|
||||
}
|
||||
}
|
||||
|
||||
if !initialized {
|
||||
|
@ -59,3 +71,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
|||
}
|
||||
return b.Put(versionKey, data)
|
||||
}
|
||||
|
||||
func currentVersion(tx *bbolt.Tx) (uint64, error) {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return 0, errVersionUndefinedNoInfoBucket
|
||||
}
|
||||
data := b.Get(versionKey)
|
||||
if len(data) != 8 {
|
||||
return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data))
|
||||
}
|
||||
return binary.LittleEndian.Uint64(data), nil
|
||||
}
|
||||
|
|
|
@ -84,4 +84,24 @@ func TestVersion(t *testing.T) {
|
|||
require.NoError(t, db.Close())
|
||||
})
|
||||
})
|
||||
t.Run("incompleted upgrade", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.Bucket(shardInfoBucket).Put(upgradeKey, zeroValue)
|
||||
}))
|
||||
require.ErrorIs(t, db.Init(), ErrIncompletedUpgrade)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.Bucket(shardInfoBucket).Delete(upgradeKey)
|
||||
}))
|
||||
require.NoError(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
|
|||
for _, component := range components {
|
||||
if err := component.Init(); err != nil {
|
||||
if component == s.metaBase {
|
||||
if errors.Is(err, meta.ErrOutdatedVersion) {
|
||||
if errors.Is(err, meta.ErrOutdatedVersion) || errors.Is(err, meta.ErrIncompletedUpgrade) {
|
||||
return fmt.Errorf("metabase initialization: %w", err)
|
||||
}
|
||||
|
||||
|
|
What did you want to check?
Doesn't
meta.Upgrade
return error on missing file?moved to
meta.Upgrade
Ah, I understand now.
If it doesn't exist, then Upgrade will create a new file, because we open with RW and this is not what we want, right?
Exactly!