From ed8993fc7e730435df5da0195705e9eb9dcac74e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 26 Aug 2024 18:31:10 +0300 Subject: [PATCH] [#1334] metabase: Add upgrade from v2 to v3 Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 13 +- cmd/frostfs-node/config/engine/config_test.go | 2 + .../config/engine/shard/config.go | 10 + config/example/node.env | 1 + config/example/node.json | 1 + config/example/node.yaml | 1 + docs/storage-node-configuration.md | 1 + pkg/local_object_storage/metabase/upgrade.go | 257 ++++++++++++++++++ .../metabase/upgrade_test.go | 205 ++++++++++++++ pkg/local_object_storage/metabase/util.go | 9 +- pkg/local_object_storage/metabase/version.go | 15 + pkg/local_object_storage/shard/control.go | 12 +- pkg/local_object_storage/shard/shard.go | 9 + 13 files changed, 525 insertions(+), 11 deletions(-) create mode 100644 pkg/local_object_storage/metabase/upgrade.go create mode 100644 pkg/local_object_storage/metabase/upgrade_test.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 110281418..18f23244a 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -121,11 +121,12 @@ type shardCfg struct { estimateCompressibility bool estimateCompressibilityThreshold float64 - smallSizeObjectLimit uint64 - uncompressableContentType []string - refillMetabase bool - refillMetabaseWorkersCount int - mode shardmode.Mode + smallSizeObjectLimit uint64 + uncompressableContentType []string + refillMetabase bool + refillMetabaseWorkersCount int + mode shardmode.Mode + skipMetabaseCompactOnUpgrade bool metaCfg struct { path string @@ -240,6 +241,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig newConfig.refillMetabase = oldConfig.RefillMetabase() newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount() + newConfig.skipMetabaseCompactOnUpgrade = oldConfig.SkipMetabaseCompactOnUpgrade() newConfig.mode = oldConfig.Mode() newConfig.compress = oldConfig.Compress() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() @@ -998,6 +1000,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount), + shard.WithSkipMetabaseCompactOnUpgrade(shCfg.skipMetabaseCompactOnUpgrade), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions(blobstoreOpts...), shard.WithMetaBaseOptions(mbOptions...), diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index d53207ccc..26610a630 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -121,6 +121,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, sc.RefillMetabase()) require.Equal(t, mode.ReadOnly, sc.Mode()) require.Equal(t, 100, sc.RefillMetabaseWorkersCount()) + require.False(t, sc.SkipMetabaseCompactOnUpgrade()) case 1: require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, fs.FileMode(0o644), pl.Perm()) @@ -176,6 +177,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, sc.RefillMetabase()) require.Equal(t, mode.ReadWrite, sc.Mode()) require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount()) + require.True(t, sc.SkipMetabaseCompactOnUpgrade()) } return nil }) diff --git a/cmd/frostfs-node/config/engine/shard/config.go b/cmd/frostfs-node/config/engine/shard/config.go index 0620c9f63..b6bfac266 100644 --- a/cmd/frostfs-node/config/engine/shard/config.go +++ b/cmd/frostfs-node/config/engine/shard/config.go @@ -149,6 +149,16 @@ func (x *Config) RefillMetabaseWorkersCount() int { return RefillMetabaseWorkersCountDefault } +// SkipMetabaseCompactOnUpgrade returns the value of "skip_metabase_compact_on_upgrade" config parameter. +// +// Returns False if the value is not valid or not defined. +func (x *Config) SkipMetabaseCompactOnUpgrade() bool { + return config.BoolSafe( + (*config.Config)(x), + "skip_metabase_compact_on_upgrade", + ) +} + // Mode return the value of "mode" config parameter. // // Panics if read the value is not one of predefined diff --git a/config/example/node.env b/config/example/node.env index b39423ffb..b78de5ce8 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -152,6 +152,7 @@ FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15 ## 1 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_1_RESYNC_METABASE=true +FROSTFS_STORAGE_SHARD_1_SKIP_METABASE_COMPACT_ON_UPGRADE=true ### Flag to set shard mode FROSTFS_STORAGE_SHARD_1_MODE=read-write ### Write cache config diff --git a/config/example/node.json b/config/example/node.json index fe2de0e01..c9186a477 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -201,6 +201,7 @@ "1": { "mode": "read-write", "resync_metabase": true, + "skip_metabase_compact_on_upgrade": true, "writecache": { "enabled": true, "path": "tmp/1/cache", diff --git a/config/example/node.yaml b/config/example/node.yaml index cc339a427..3a0211787 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -208,6 +208,7 @@ storage: expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector 1: + skip_metabase_compact_on_upgrade: true writecache: path: tmp/1/cache # write-cache root directory capacity: 4 G # approximate write-cache total size, bytes diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 5bf35cd65..f5439931b 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -189,6 +189,7 @@ The following table describes configuration for each shard. | `mode` | `string` | `read-write` | Shard Mode.
Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` | | `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. | | `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. | +| `skip_metabase_compact_on_upgrade` | `bool` | `false` | If `true` then metabase will not be compacted on upgrade. | | `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. | | `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. | | `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. | diff --git a/pkg/local_object_storage/metabase/upgrade.go b/pkg/local_object_storage/metabase/upgrade.go new file mode 100644 index 000000000..e0fde41c2 --- /dev/null +++ b/pkg/local_object_storage/metabase/upgrade.go @@ -0,0 +1,257 @@ +package meta + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "os" + "strconv" + "time" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" + "golang.org/x/sync/errgroup" +) + +var updates = map[uint64]func(ctx context.Context, db *bbolt.DB) error{ + 2: upgradeFromV2ToV3, +} + +var errFailedToUpgradeDatabaseNotOpen = errors.New("failed to upgrade metabase: database not open") + +func (db *DB) Upgrade(ctx context.Context, compact bool) error { + db.modeMtx.Lock() + defer db.modeMtx.Unlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } else if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + if db.boltDB == nil { + return errFailedToUpgradeDatabaseNotOpen + } + + var version uint64 + if err := db.boltDB.View(func(tx *bbolt.Tx) error { + var e error + version, e = currentVersion(tx) + return e + }); err != nil { + return err + } + updater, found := updates[version] + if !found { + return fmt.Errorf("unsupported version %d: no update available", version) + } + if err := updater(ctx, db.boltDB); err != nil { + return fmt.Errorf("failed to update metabase schema: %w", err) + } + if compact { + err := db.compact() + if err != nil { + return fmt.Errorf("failed to compact metabase: %w", err) + } + } + return nil +} + +func (db *DB) compact() error { + tmpFileName := db.info.Path + "." + time.Now().Format(time.RFC3339) + dst, err := bbolt.Open(tmpFileName, db.info.Permission, db.boltOptions) + if err != nil { + return fmt.Errorf("can't open new metabase to compact: %w", err) + } + if err := bbolt.Compact(dst, db.boltDB, 256<<20); err != nil { + return fmt.Errorf("failed to compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName))) + } + if err := dst.Close(); err != nil { + return fmt.Errorf("failed to close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName))) + } + if err := db.boltDB.Close(); err != nil { + return fmt.Errorf("failed to close source metabase: %w", errors.Join(err, os.Remove(tmpFileName))) + } + db.boltDB = nil + db.initialized = false + if err := os.Rename(tmpFileName, db.info.Path); err != nil { + return fmt.Errorf("failed to replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName))) + } + return db.openBolt() +} + +func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB) error { + if err := createExpirationEpochBuckets(ctx, db); err != nil { + return err + } + if err := dropUserAttributes(ctx, db); err != nil { + return err + } + if err := dropOwnerIDIndex(ctx, db); err != nil { + return err + } + if err := dropPayloadChecksumIndex(ctx, db); err != nil { + return err + } + return db.Update(func(tx *bbolt.Tx) error { + return updateVersion(tx, version) + }) +} + +type objectIDToExpEpoch struct { + containerID cid.ID + objectID oid.ID + expirationEpoch uint64 +} + +func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB) error { + if err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName) + return err + }); err != nil { + return err + } + + objects := make(chan objectIDToExpEpoch, 1000) + defer func() { + close(objects) + }() + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return selectObjectsWithExpirationEpoch(ctx, db, objects) + }) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case obj := <-objects: + return db.Update(func(tx *bbolt.Tx) error { + if err := putUniqueIndexItem(tx, namedBucketItem{ + name: expEpochToObjectBucketName, + key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID), + val: zeroValue, + }); err != nil { + return err + } + val := make([]byte, epochSize) + binary.LittleEndian.PutUint64(val, obj.expirationEpoch) + return putUniqueIndexItem(tx, namedBucketItem{ + name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)), + key: objectKey(obj.objectID, make([]byte, objectKeySize)), + val: val, + }) + }) + } + }) + return eg.Wait() +} + +func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error { + prefix := []byte{userAttributePrefix} + return db.View(func(tx *bbolt.Tx) error { + userAttrCursor := tx.Cursor() + for userAttrKey, _ := userAttrCursor.Seek(prefix); userAttrKey != nil && bytes.HasPrefix(userAttrKey, prefix); userAttrKey, _ = userAttrCursor.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(userAttrKey) <= 1+cidSize { + continue + } + attributeKey := string(userAttrKey[1+cidSize:]) + if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch { + continue + } + + var containerID cid.ID + if err := containerID.Decode(userAttrKey[1 : 1+cidSize]); err != nil { + return fmt.Errorf("failed to decode container id from user attribute bucket: %w", err) + } + + b := tx.Bucket(userAttrKey) + return b.ForEachBucket(func(expKey []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + expirationEpoch, err := strconv.ParseUint(string(expKey), 10, 64) + if err != nil { + return fmt.Errorf("could not parse expiration epoch: %w", err) + } + expirationEpochBucket := b.Bucket(expKey) + return expirationEpochBucket.ForEach(func(k, _ []byte) error { + var objectID oid.ID + if err := objectID.Decode(k); err != nil { + return fmt.Errorf("failed to decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case objects <- objectIDToExpEpoch{ + containerID: containerID, + objectID: objectID, + expirationEpoch: expirationEpoch, + }: + return nil + } + }) + }) + } + return nil + }) +} + +func dropUserAttributes(ctx context.Context, db *bbolt.DB) error { + return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}) +} + +func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB) error { + return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}) +} + +func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB) error { + return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}) +} + +func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte) error { + const batch = 100 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + var keys [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + c := tx.Cursor() + for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() { + keys = append(keys, bytes.Clone(k)) + } + return nil + }); err != nil { + return err + } + if len(keys) == 0 { + return nil + } + if err := db.Update(func(tx *bbolt.Tx) error { + for _, k := range keys { + if err := tx.DeleteBucket(k); err != nil { + return err + } + } + return nil + }); err != nil { + return err + } + } +} diff --git a/pkg/local_object_storage/metabase/upgrade_test.go b/pkg/local_object_storage/metabase/upgrade_test.go new file mode 100644 index 000000000..3241e1b5c --- /dev/null +++ b/pkg/local_object_storage/metabase/upgrade_test.go @@ -0,0 +1,205 @@ +//go:build integration + +package meta + +import ( + "context" + "io" + "os" + "strconv" + "testing" + "time" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +const upgradeFilePath = "/path/to/metabase.v2" + +func TestUpgradeV2ToV3(t *testing.T) { + path := createTempCopy(t, upgradeFilePath) + defer func() { + require.NoError(t, os.Remove(path)) + }() + db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t))) + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + require.ErrorIs(t, db.Init(), ErrOutdatedVersion) + require.NoError(t, db.Upgrade(context.Background(), true)) + require.NoError(t, db.Init()) + require.NoError(t, db.Close()) +} + +func createTempCopy(t *testing.T, path string) string { + src, err := os.Open(path) + require.NoError(t, err) + + tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339) + dest, err := os.Create(tmpPath) + require.NoError(t, err) + + _, err = io.Copy(dest, src) + require.NoError(t, err) + + require.NoError(t, src.Close()) + require.NoError(t, dest.Close()) + + return tmpPath +} + +func TestGenerateMetabaseFile(t *testing.T) { + t.Skip("for generating db") + // This test generates a metabase file with 2 million objects for 10 000 containers, + // of which + // 500 000 are simple objects, + // 500 000 are complex objects (total 1 million), + // 100 000 are deleted by gcMarks, + // 100 000 are deleted by tombstones (total 200 000), + // 100 000 million are locked (total 200 000). + + db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)), + WithMaxBatchDelay(100*time.Millisecond), WithMaxBatchSize(1000)) + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + db.boltDB.AllocSize = 128 << 20 + db.boltDB.NoSync = true + require.NoError(t, db.Init()) + containers := make([]cid.ID, 10_000) + for i := range containers { + containers[i] = cidtest.ID() + } + oc, err := db.ObjectCounters() + require.NoError(t, err) + require.True(t, oc.IsZero()) + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(10000) + // simple objects + for i := 0; i < 500_000; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10)) + testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)), + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(10000) + // complex objects + for i := 0; i < 500_000; i++ { + i := i + eg.Go(func() error { + parent := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + child := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + child.SetParent(parent) + idParent, _ := parent.ID() + child.SetParentID(idParent) + testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%1_000), 10)) + testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%1_000), 10)) + testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10)) + testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: child, + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("complex objects generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(10000) + // simple objects deleted by gc marks + for i := 0; i < 100_000; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)), + }) + require.NoError(t, err) + _, err = db.Inhume(ctx, InhumePrm{ + target: []oid.Address{object.AddressOf(obj)}, + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects deleted by gc marks generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(10000) + // simple objects deleted by tombstones + for i := 0; i < 100_000; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)), + }) + tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + tomb.SetType(objectSDK.TypeTombstone) + _, err = db.Put(ctx, PutPrm{ + obj: tomb, + id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)), + }) + require.NoError(t, err) + tombAddr := object.AddressOf(tomb) + _, err = db.Inhume(ctx, InhumePrm{ + target: []oid.Address{object.AddressOf(obj)}, + tomb: &tombAddr, + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects deleted by tombstones generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(10000) + // simple objects locked by locks + for i := 0; i < 100_000; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)), + }) + lock := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + lock.SetType(objectSDK.TypeLock) + testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10)) + _, err = db.Put(ctx, PutPrm{ + obj: lock, + id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)), + }) + require.NoError(t, err) + err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()}) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects locked by locks generated") + require.NoError(t, db.boltDB.Sync()) + require.NoError(t, db.Close()) +} diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 9134616fe..eef7210dc 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -94,11 +94,13 @@ const ( // ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs. // Key: owner ID // Value: bucket containing object IDs as keys - _ + // removed in version 3 + ownerPrefix // userAttributePrefix was used for prefixing FKBT index buckets containing objects. // Key: attribute value // Value: bucket containing object IDs as keys - _ + // removed in version 3 + userAttributePrefix // ==================== // List index buckets. @@ -107,7 +109,8 @@ const ( // payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs. // Key: payload hash // Value: list of object IDs - _ + // removed in version 3 + payloadHashPrefix // parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs. // Key: parent ID // Value: list of object IDs diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index bb2b66d9b..9e15babbc 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -2,6 +2,7 @@ package meta import ( "encoding/binary" + "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -18,6 +19,8 @@ var versionKey = []byte("version") // the current code version. var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required") +var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket") + func checkVersion(tx *bbolt.Tx, initialized bool) error { var knownVersion bool @@ -59,3 +62,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error { } return b.Put(versionKey, data) } + +func currentVersion(tx *bbolt.Tx) (uint64, error) { + b := tx.Bucket(shardInfoBucket) + if b == nil { + return 0, errVersionUndefinedNoInfoBucket + } + data := b.Get(versionKey) + if len(data) != 8 { + return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data)) + } + return binary.LittleEndian.Uint64(data), nil +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 936a506c0..abaa7c745 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -100,7 +100,7 @@ func (x *metabaseSynchronizer) Init() error { // Init initializes all Shard's components. func (s *Shard) Init(ctx context.Context) error { m := s.GetMode() - if err := s.initializeComponents(m); err != nil { + if err := s.initializeComponents(ctx, m); err != nil { return err } @@ -137,7 +137,7 @@ func (s *Shard) Init(ctx context.Context) error { return nil } -func (s *Shard) initializeComponents(m mode.Mode) error { +func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error { type initializer interface { Init() error } @@ -172,7 +172,13 @@ func (s *Shard) initializeComponents(m mode.Mode) error { if err := component.Init(); err != nil { if component == s.metaBase { if errors.Is(err, meta.ErrOutdatedVersion) { - return fmt.Errorf("metabase initialization: %w", err) + err = s.metaBase.Upgrade(ctx, !s.skipMetabaseCompactOnUpgrade) + if err == nil { + err = s.metaBase.Init() + } + } + if err == nil { + continue } err = s.handleMetabaseFailure("init", err) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 93f5354a7..d3f134ecf 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -105,6 +105,8 @@ type cfg struct { refillMetabase bool refillMetabaseWorkersCount int + skipMetabaseCompactOnUpgrade bool + rmBatchSize int useWriteCache bool @@ -317,6 +319,13 @@ func WithRefillMetabaseWorkersCount(v int) Option { } } +// WithSkipMetabaseCompactOnUpgrade returns option to disable metabase compaction on upgrade. +func WithSkipMetabaseCompactOnUpgrade(v bool) Option { + return func(c *cfg) { + c.skipMetabaseCompactOnUpgrade = v + } +} + // WithMode returns option to set shard's mode. Mode must be one of the predefined: // - mode.ReadWrite; // - mode.ReadOnly.