metabase: Add upgrade from v2 to v3 #1334

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/metabase_upgrade into master 2024-09-04 19:51:11 +00:00
9 changed files with 746 additions and 5 deletions

View file

@ -0,0 +1,15 @@
package metabase
import "github.com/spf13/cobra"
// RootCmd is a root command of config section.
var RootCmd = &cobra.Command{
Use: "metabase",
Short: "Section for metabase commands",
}
func init() {
RootCmd.AddCommand(UpgradeCmd)
initUpgradeCommand()
}

View file

@ -0,0 +1,86 @@
package metabase
import (
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"github.com/spf13/cobra"
)
const (
pathFlag = "path"
noCompactFlag = "no-compact"
)
var errNoPathsFound = errors.New("no metabase paths found")
var path string
var UpgradeCmd = &cobra.Command{
Use: "upgrade",
Short: "Upgrade metabase to latest version",
RunE: upgrade,
}

What did you want to check?
Doesn't meta.Upgrade return error on missing file?

What did you want to check? Doesn't `meta.Upgrade` return error on missing file?

moved to meta.Upgrade

moved to `meta.Upgrade`

Ah, I understand now.
If it doesn't exist, then Upgrade will create a new file, because we open with RW and this is not what we want, right?

Ah, I understand now. If it doesn't exist, then Upgrade will create a new file, because we open with RW and this is not what we want, right?

Exactly!

Exactly!
func upgrade(cmd *cobra.Command, _ []string) error {
configFile, err := cmd.Flags().GetString(commonflags.ConfigFlag)
if err != nil {
return err
}
configDir, err := cmd.Flags().GetString(commonflags.ConfigDirFlag)
if err != nil {
return err
}
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
var paths []string
if path != "" {
paths = append(paths, path)
}
appCfg := config.New(configFile, configDir, config.EnvPrefix)
if err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
paths = append(paths, sc.Metabase().Path())
return nil
}); err != nil {
return fmt.Errorf("failed to get metabase paths: %w", err)
}
if len(paths) == 0 {
return errNoPathsFound
}
cmd.Println("found", len(paths), "metabases:")
for i, path := range paths {
cmd.Println(i+1, ":", path)
}
result := make(map[string]bool)
for _, path := range paths {
cmd.Println("upgrading metabase", path, "...")
if err := meta.Upgrade(cmd.Context(), path, !noCompact, func(a ...any) {
cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":", path, ":"}, a...)...)
}); err != nil {
result[path] = false
cmd.Println("error: failed to upgrade metabase", path, ":", err)
} else {
result[path] = true
cmd.Println("metabase", path, "upgraded successfully")
}
}
for mb, ok := range result {
if ok {
cmd.Println(mb, ": success")
} else {
cmd.Println(mb, ": failed")
}
}
return nil
}
func initUpgradeCommand() {
flags := UpgradeCmd.Flags()
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
}

View file

@ -5,6 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
@ -41,6 +42,7 @@ func init() {
rootCmd.AddCommand(config.RootCmd)
rootCmd.AddCommand(morph.RootCmd)
rootCmd.AddCommand(storagecfg.RootCmd)
rootCmd.AddCommand(metabase.RootCmd)
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))

View file

@ -0,0 +1,376 @@
package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"strconv"
"sync/atomic"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)
const (
upgradeLogFrequency = 50_000
fyrchik marked this conversation as resolved Outdated

logFrequency?

`logFrequency`?

ok, done

ok, done
upgradeWorkersCount = 1_000
compactMaxTxSize = 256 << 20
upgradeTimeout = 1 * time.Second
Review

We use 100ms in all other code, but no big deal.

We use 100ms in all other code, but no big deal.
)
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
2: upgradeFromV2ToV3,
}
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
if _, err := os.Stat(path); err != nil {
fyrchik marked this conversation as resolved Outdated

It will be failet to .. : failed to .. : failed to ..
How about open metabase: %w?

It will be `failet to .. : failed to .. : failed to ..` How about `open metabase: %w`?

ok, fixed

ok, fixed
return fmt.Errorf("check metabase existence: %w", err)
}
opts := bbolt.DefaultOptions
opts.Timeout = upgradeTimeout
db, err := bbolt.Open(path, os.ModePerm, opts)
if err != nil {
return fmt.Errorf("open metabase: %w", err)
}
var version uint64
if err := db.View(func(tx *bbolt.Tx) error {
var e error
version, e = currentVersion(tx)
return e
}); err != nil {
return err
}
updater, found := updates[version]
acid-ant marked this conversation as resolved Outdated

Why we need to fail here, maybe just print debug message that update is not necessary?

Why we need to fail here, maybe just print debug message that update is not necessary?

Because this means that database schema is not supported by code. For example, metabase v1 will not work with v0.42 frostfs-node version.

Because this means that database schema is not supported by code. For example, metabase v1 will not work with v0.42 frostfs-node version.
if !found {
return fmt.Errorf("unsupported version %d: no update available", version)
}
if err := db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
return b.Put(upgradeKey, zeroValue)
}); err != nil {
return fmt.Errorf("set upgrade key %w", err)
}
if err := updater(ctx, db, log); err != nil {
return fmt.Errorf("update metabase schema: %w", err)
}
if err := db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
return b.Delete(upgradeKey)
}); err != nil {
return fmt.Errorf("delete upgrade key %w", err)
}
if compact {
log("compacting metabase...")
err := compactDB(db)
acid-ant marked this conversation as resolved Outdated

Looks like we need to remove all files which matches pattern db.info.Path + "." + "*", not only tmpFileName.
Or how about to do cleanup before starting update?

Looks like we need to remove all files which matches pattern `db.info.Path + "." + "*"`, not only `tmpFileName`. Or how about to do cleanup before starting update?

I suppose to not to delete files by pattern to not to delete something needed (manual backup for example)

I suppose to not to delete files by pattern to not to delete something needed (manual backup for example)

That will lead to increasing garbage inside db.info.Path when node killed.

That will lead to increasing garbage inside `db.info.Path` when node killed.
if err != nil {
return fmt.Errorf("compact metabase: %w", err)
}
fyrchik marked this conversation as resolved Outdated

Magic constant.

Magic constant.

fixed

fixed
log("metabase compacted")
}
return db.Close()
}
func compactDB(db *bbolt.DB) error {
sourcePath := db.Path()
tmpFileName := sourcePath + "." + time.Now().Format(time.RFC3339)
f, err := os.Stat(sourcePath)
if err != nil {
return err
}
dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{
Timeout: 100 * time.Millisecond,
})
if err != nil {
return fmt.Errorf("can't open new metabase to compact: %w", err)
}
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
}
if err := dst.Close(); err != nil {
return fmt.Errorf("close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
}
if err := db.Close(); err != nil {
return fmt.Errorf("close source metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
}
if err := os.Rename(tmpFileName, sourcePath); err != nil {
return fmt.Errorf("replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName)))
}
return nil
}
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
return err
}
if err := dropUserAttributes(ctx, db, log); err != nil {
return err
}
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
return err
}
if err := dropPayloadChecksumIndex(ctx, db, log); err != nil {
return err
}
return db.Update(func(tx *bbolt.Tx) error {
return updateVersion(tx, version)
})
}
type objectIDToExpEpoch struct {
containerID cid.ID
objectID oid.ID
expirationEpoch uint64
}
func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
log("filling expiration epoch buckets...")
fyrchik marked this conversation as resolved Outdated

Why parallelize but use Update instead of Batch?

Why parallelize but use `Update` instead of `Batch`?

Outdated

Outdated
if err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName)
return err
}); err != nil {
return err
}
objects := make(chan objectIDToExpEpoch)
fyrchik marked this conversation as resolved Outdated

Do we always put by some new key?
Otherwise we might not behave idempotently, this is a must for upgrade.

Do we always `put` by some new key? Otherwise we might not behave idempotently, this is a must for upgrade.

For same object expirationEpochKey will produce the same key. Why this leads to idempotence lost?

For same object `expirationEpochKey` will produce the same key. Why this leads to idempotence lost?
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return selectObjectsWithExpirationEpoch(ctx, db, objects)
})
var count atomic.Uint64
for i := 0; i < upgradeWorkersCount; i++ {
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case obj, ok := <-objects:
if !ok {
return nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
if err := putUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
val: zeroValue,
}); err != nil {
return err
}
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
return putUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
fyrchik marked this conversation as resolved Outdated

Can log have signature similar to fmt.Println to avoid fmt.Sprintf on callsites?

Can log have signature similar to `fmt.Println` to avoid `fmt.Sprintf` on callsites?

done

done
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
val: val,
})
}); err != nil {
return err
}
}
if c := count.Add(1); c%upgradeLogFrequency == 0 {
log("expiration epoch filled for", c, "objects...")
}
}
})
}
err := eg.Wait()
if err != nil {
log("expiration epoch buckets completed completed with error:", err)
return err
}
log("filling expiration epoch buckets completed successfully, total", count.Load(), "objects")
return nil
}
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
defer close(objects)
const batchSize = 1000
it := &objectsWithExpirationEpochBatchIterator{
lastAttributeKey: usrAttrPrefix,
}
for {
if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil {
return err
}
for _, item := range it.items {
select {
case <-ctx.Done():
return ctx.Err()
case objects <- item:
}
}
if len(it.items) < batchSize {
return nil
}
it.items = nil
}
}
var (
usrAttrPrefix = []byte{userAttributePrefix}
errBatchSizeLimit = errors.New("batch size limit")
)
type objectsWithExpirationEpochBatchIterator struct {
lastAttributeKey []byte
lastAttributeValue []byte
lastAttrKeyValueItem []byte
items []objectIDToExpEpoch
}
// - {prefix}{containerID}{attributeKey} <- bucket
fyrchik marked this conversation as resolved Outdated

If we need only 2 attributes, we can have 2 separate Seek loops.
It seems we currently are iterating over all attributes.

If we need only 2 attributes, we can have 2 separate `Seek` loops. It seems we currently are iterating over all attributes.

How? User attributes buckets have names with pattern {prefix}{cid}{attribute}.

How? User attributes buckets have names with pattern `{prefix}{cid}{attribute}`.

Oh, I see the problem -- we need to iterate specific attribute, but in all containers.

Still possible with seeks (remember current seed and seek for each container), but your implementation is more readable.

Oh, I see the problem -- we need to iterate specific attribute, but in all containers. Still possible with seeks (remember current seed and seek for each container), but your implementation is more readable.
// -- {attributeValue} <- bucket, expirationEpoch
// --- {objectID}: zeroValue <- record
func getNextObjectsWithExpirationEpochBatch(ctx context.Context, db *bbolt.DB, it *objectsWithExpirationEpochBatchIterator, batchSize int) error {
seekAttrValue := it.lastAttributeValue
seekAttrKVItem := it.lastAttrKeyValueItem
err := db.View(func(tx *bbolt.Tx) error {
attrKeyC := tx.Cursor()
for attrKey, _ := attrKeyC.Seek(it.lastAttributeKey); attrKey != nil && bytes.HasPrefix(attrKey, usrAttrPrefix); attrKey, _ = attrKeyC.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if len(attrKey) <= 1+cidSize {
continue
}
attributeKey := string(attrKey[1+cidSize:])
if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch {
continue
}
var containerID cid.ID
if err := containerID.Decode(attrKey[1 : 1+cidSize]); err != nil {
return fmt.Errorf("decode container id from user attribute bucket: %w", err)
}
if err := iterateExpirationAttributeKeyBucket(ctx, tx.Bucket(attrKey), it, batchSize, containerID, attrKey, seekAttrValue, seekAttrKVItem); err != nil {
return err
}
seekAttrValue = nil
seekAttrKVItem = nil
}
return nil
})
if err != nil && !errors.Is(err, errBatchSizeLimit) {
return err
}
return nil
}
func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, it *objectsWithExpirationEpochBatchIterator, batchSize int, containerID cid.ID, attrKey, seekAttrValue, seekAttrKVItem []byte) error {
attrValueC := b.Cursor()
for attrValue, v := attrValueC.Seek(seekAttrValue); attrValue != nil; attrValue, v = attrValueC.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if v != nil {
continue // need to iterate over buckets, not records
}
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err)
}
fyrchik marked this conversation as resolved Outdated

Why do we clone them? They seem to be used only during tx.

Why do we clone them? They seem to be used only during tx.

No, it used outside transaction:

	it := &objectsWithExpirationEpochBatchIterator{
		lastAttributeKey: usrAttrPrefix,
	}
	for {
		if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil {
			return err
		}
No, `it` used outside transaction: ``` it := &objectsWithExpirationEpochBatchIterator{ lastAttributeKey: usrAttrPrefix, } for { if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil { return err } ```
expirationEpochBucket := b.Bucket(attrValue)
attrKeyValueC := expirationEpochBucket.Cursor()
for attrKeyValueItem, v := attrKeyValueC.Seek(seekAttrKVItem); attrKeyValueItem != nil; attrKeyValueItem, v = attrKeyValueC.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if v == nil {
continue // need to iterate over records, not buckets
}
if bytes.Equal(it.lastAttributeKey, attrKey) && bytes.Equal(it.lastAttributeValue, attrValue) && bytes.Equal(it.lastAttrKeyValueItem, attrKeyValueItem) {
continue
}
var objectID oid.ID
if err := objectID.Decode(attrKeyValueItem); err != nil {
return fmt.Errorf("decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err)
}
it.lastAttributeKey = bytes.Clone(attrKey)
it.lastAttributeValue = bytes.Clone(attrValue)
it.lastAttrKeyValueItem = bytes.Clone(attrKeyValueItem)
it.items = append(it.items, objectIDToExpEpoch{
containerID: containerID,
objectID: objectID,
expirationEpoch: expirationEpoch,
})
if len(it.items) == batchSize {
return errBatchSizeLimit
}
}
seekAttrKVItem = nil
}
return nil
}
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) {
log(append([]any{"user attributes:"}, a...)...)
})
}
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
log(append([]any{"owner ID index:"}, a...)...)
})
}
func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}, func(a ...any) {
log(append([]any{"payload checksum:"}, a...)...)
})
}
func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte, log func(a ...any)) error {
log("deleting buckets...")
const batch = 1000
var count uint64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var keys [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
keys = append(keys, bytes.Clone(k))
}
return nil
}); err != nil {
log("deleting buckets completed with an error:", err)
return err
}
if len(keys) == 0 {
log("deleting buckets completed successfully, deleted", count, "buckets")
return nil
}
if err := db.Update(func(tx *bbolt.Tx) error {
for _, k := range keys {
if err := tx.DeleteBucket(k); err != nil {
return err
}
}
return nil
}); err != nil {
log("deleting buckets completed with an error:", err)
return err
}
if count += uint64(len(keys)); count%upgradeLogFrequency == 0 {
log("deleted", count, "buckets")
}
}
}

View file

@ -0,0 +1,215 @@
//go:build integration
package meta
import (
"context"
"fmt"
"io"
"os"
"strconv"
"testing"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
const upgradeFilePath = "/path/to/metabase.v2"
func TestUpgradeV2ToV3(t *testing.T) {
path := createTempCopy(t, upgradeFilePath)
defer func() {
require.NoError(t, os.Remove(path))
}()
db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
require.NoError(t, db.Close())
require.NoError(t, Upgrade(context.Background(), path, true, t.Log))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
fmt.Println()
}
func createTempCopy(t *testing.T, path string) string {
src, err := os.Open(path)
require.NoError(t, err)
tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339)
dest, err := os.Create(tmpPath)
require.NoError(t, err)
_, err = io.Copy(dest, src)
require.NoError(t, err)
require.NoError(t, src.Close())
require.NoError(t, dest.Close())
return tmpPath
}
func TestGenerateMetabaseFile(t *testing.T) {
t.Skip("for generating db")
const (
fyrchik marked this conversation as resolved Outdated

Could you move these numbers to constants and define here instead of the comment?
It might be useful to easily adjust the test, instead of searching the whole function and thinking whether THIS 100_000 is the one I need.

Could you move these numbers to constants and define here instead of the comment? It might be useful to easily adjust the test, instead of searching the whole function and thinking whether THIS 100_000 is the one I need.

done

done
containersCount = 10_000
simpleObjectsCount = 500_000
complexObjectsCount = 500_000 // x2
deletedByGCMarksCount = 100_000
deletedByTombstoneCount = 100_000 // x2
lockedCount = 100_000 // x2
fyrchik marked this conversation as resolved Outdated

Do you have any specific motivation to use sth different from defaults in tests?

Do you have any specific motivation to use sth different from defaults in tests?

no, debugee

no, debugee
allocSize = 128 << 20
generateWorkersCount = 1_000
fyrchik marked this conversation as resolved Outdated

Magic constant.

Magic constant.

fixed

fixed
minEpoch = 1_000
maxFilename = 1_000
maxStorageID = 10_000
)
db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: minEpoch}), WithLogger(test.NewLogger(t)))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
db.boltDB.AllocSize = allocSize
db.boltDB.NoSync = true
require.NoError(t, db.Init())
containers := make([]cid.ID, containersCount)
for i := range containers {
containers[i] = cidtest.ID()
}
oc, err := db.ObjectCounters()
require.NoError(t, err)
require.True(t, oc.IsZero())
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(generateWorkersCount)
// simple objects
for i := 0; i < simpleObjectsCount; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(generateWorkersCount)
// complex objects
for i := 0; i < complexObjectsCount; i++ {
i := i
eg.Go(func() error {
parent := testutil.GenerateObjectWithCID(containers[i%len(containers)])
child := testutil.GenerateObjectWithCID(containers[i%len(containers)])
child.SetParent(parent)
idParent, _ := parent.ID()
child.SetParentID(idParent)
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
_, err := db.Put(ctx, PutPrm{
obj: child,
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("complex objects generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(generateWorkersCount)
// simple objects deleted by gc marks
for i := 0; i < deletedByGCMarksCount; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
})
require.NoError(t, err)
_, err = db.Inhume(ctx, InhumePrm{
target: []oid.Address{object.AddressOf(obj)},
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects deleted by gc marks generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(10000)
// simple objects deleted by tombstones
for i := 0; i < deletedByTombstoneCount; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
})
tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)])
tomb.SetType(objectSDK.TypeTombstone)
_, err = db.Put(ctx, PutPrm{
obj: tomb,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
})
require.NoError(t, err)
tombAddr := object.AddressOf(tomb)
_, err = db.Inhume(ctx, InhumePrm{
target: []oid.Address{object.AddressOf(obj)},
tomb: &tombAddr,
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects deleted by tombstones generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(generateWorkersCount)
// simple objects locked by locks
for i := 0; i < lockedCount; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10))
fyrchik marked this conversation as resolved Outdated

Isn't this broken? db.Put uses v3 logic, because that is what we want in our codebase and the test need to generate v2 db.

Isn't this broken? `db.Put` uses v3 logic, because that is what we want in our codebase and the test need to generate v2 db.

I used this test on support/v0.42 branch.

I used this test on `support/v0.42` branch.
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
})
lock := testutil.GenerateObjectWithCID(containers[i%len(containers)])
lock.SetType(objectSDK.TypeLock)
testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10))
_, err = db.Put(ctx, PutPrm{
obj: lock,
id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)),
})
require.NoError(t, err)
err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects locked by locks generated")
require.NoError(t, db.boltDB.Sync())
require.NoError(t, db.Close())
}

View file

@ -94,11 +94,13 @@ const (
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
// Key: owner ID
// Value: bucket containing object IDs as keys
_
// removed in version 3
ownerPrefix
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
// Key: attribute value
// Value: bucket containing object IDs as keys
_
// removed in version 3
userAttributePrefix
// ====================
// List index buckets.
@ -107,7 +109,8 @@ const (
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
// Key: payload hash
// Value: list of object IDs
_
// removed in version 3
payloadHashPrefix
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
// Key: parent ID
// Value: list of object IDs

View file

@ -2,6 +2,7 @@ package meta
import (
"encoding/binary"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -11,13 +12,20 @@ import (
// version contains current metabase version.
const version = 3
var versionKey = []byte("version")
var (
versionKey = []byte("version")
upgradeKey = []byte("upgrade")
)
// ErrOutdatedVersion is returned on initializing
// an existing metabase that is not compatible with
// the current code version.
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
var ErrIncompletedUpgrade = logicerr.New("metabase upgrade is not completed")
var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket")
func checkVersion(tx *bbolt.Tx, initialized bool) error {
var knownVersion bool
@ -32,6 +40,10 @@ func checkVersion(tx *bbolt.Tx, initialized bool) error {
return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored)
}
}
data = b.Get(upgradeKey)
if len(data) > 0 {
return ErrIncompletedUpgrade
}
}
if !initialized {
@ -59,3 +71,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
}
return b.Put(versionKey, data)
}
func currentVersion(tx *bbolt.Tx) (uint64, error) {
b := tx.Bucket(shardInfoBucket)
if b == nil {
return 0, errVersionUndefinedNoInfoBucket
}
data := b.Get(versionKey)
if len(data) != 8 {
return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data))
}
return binary.LittleEndian.Uint64(data), nil
}

View file

@ -84,4 +84,24 @@ func TestVersion(t *testing.T) {
require.NoError(t, db.Close())
})
})
t.Run("incompleted upgrade", func(t *testing.T) {
db := newDB(t)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(shardInfoBucket).Put(upgradeKey, zeroValue)
}))
require.ErrorIs(t, db.Init(), ErrIncompletedUpgrade)
require.NoError(t, db.Close())
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(shardInfoBucket).Delete(upgradeKey)
}))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
})
}

View file

@ -171,7 +171,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
for _, component := range components {
if err := component.Init(); err != nil {
if component == s.metaBase {
if errors.Is(err, meta.ErrOutdatedVersion) {
if errors.Is(err, meta.ErrOutdatedVersion) || errors.Is(err, meta.ErrIncompletedUpgrade) {
return fmt.Errorf("metabase initialization: %w", err)
}