From 07d1a5e6bd265a237c71f8303fc51ad1bf049248 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 28 Aug 2024 18:32:30 +0300 Subject: [PATCH] [#1334] metabase: Add upgrade from v2 to v3 Signed-off-by: Dmitrii Stepanov --- .../internal/modules/metabase/root.go | 15 + .../internal/modules/metabase/upgrade.go | 40 ++ cmd/frostfs-adm/internal/modules/root.go | 2 + pkg/local_object_storage/metabase/upgrade.go | 361 ++++++++++++++++++ .../metabase/upgrade_test.go | 215 +++++++++++ pkg/local_object_storage/metabase/util.go | 9 +- pkg/local_object_storage/metabase/version.go | 15 + 7 files changed, 654 insertions(+), 3 deletions(-) create mode 100644 cmd/frostfs-adm/internal/modules/metabase/root.go create mode 100644 cmd/frostfs-adm/internal/modules/metabase/upgrade.go create mode 100644 pkg/local_object_storage/metabase/upgrade.go create mode 100644 pkg/local_object_storage/metabase/upgrade_test.go diff --git a/cmd/frostfs-adm/internal/modules/metabase/root.go b/cmd/frostfs-adm/internal/modules/metabase/root.go new file mode 100644 index 000000000..5b21ed273 --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/metabase/root.go @@ -0,0 +1,15 @@ +package metabase + +import "github.com/spf13/cobra" + +// RootCmd is a root command of config section. +var RootCmd = &cobra.Command{ + Use: "metabase", + Short: "Section for metabase commands", +} + +func init() { + RootCmd.AddCommand(UpgradeCmd) + + initUpgradeCommand() +} diff --git a/cmd/frostfs-adm/internal/modules/metabase/upgrade.go b/cmd/frostfs-adm/internal/modules/metabase/upgrade.go new file mode 100644 index 000000000..d9351f59a --- /dev/null +++ b/cmd/frostfs-adm/internal/modules/metabase/upgrade.go @@ -0,0 +1,40 @@ +package metabase + +import ( + "fmt" + "time" + + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "github.com/spf13/cobra" +) + +const ( + pathFlag = "path" + noCompactFlag = "no-compact" +) + +var path string + +var UpgradeCmd = &cobra.Command{ + Use: "upgrade", + Short: "Upgrade metabase to latest version", + RunE: upgrade, +} + +func upgrade(cmd *cobra.Command, _ []string) error { + path, _ := cmd.Flags().GetString(pathFlag) + noCompact, _ := cmd.Flags().GetBool(noCompactFlag) + if err := meta.Upgrade(cmd.Context(), path, !noCompact, func(a ...any) { + cmd.Println(append([]any{time.Now().Format(time.RFC3339), ":"}, a...)...) + }); err != nil { + return fmt.Errorf("failed to upgrade metabase: %w", err) + } + return nil +} + +func initUpgradeCommand() { + flags := UpgradeCmd.Flags() + flags.StringVar(&path, pathFlag, "", "Path to metabase file") + _ = UpgradeCmd.MarkFlagRequired(pathFlag) + flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file") +} diff --git a/cmd/frostfs-adm/internal/modules/root.go b/cmd/frostfs-adm/internal/modules/root.go index 8595483ab..defd898c8 100644 --- a/cmd/frostfs-adm/internal/modules/root.go +++ b/cmd/frostfs-adm/internal/modules/root.go @@ -5,6 +5,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg" "git.frostfs.info/TrueCloudLab/frostfs-node/misc" @@ -41,6 +42,7 @@ func init() { rootCmd.AddCommand(config.RootCmd) rootCmd.AddCommand(morph.RootCmd) rootCmd.AddCommand(storagecfg.RootCmd) + rootCmd.AddCommand(metabase.RootCmd) rootCmd.AddCommand(autocomplete.Command("frostfs-adm")) rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{})) diff --git a/pkg/local_object_storage/metabase/upgrade.go b/pkg/local_object_storage/metabase/upgrade.go new file mode 100644 index 000000000..6b5c656cb --- /dev/null +++ b/pkg/local_object_storage/metabase/upgrade.go @@ -0,0 +1,361 @@ +package meta + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "os" + "strconv" + "sync/atomic" + "time" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" + "golang.org/x/sync/errgroup" +) + +const ( + logGrequency = 50_000 + workersCount = 1_000 + compactMaxTxSize = 256 << 20 +) + +var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{ + 2: upgradeFromV2ToV3, +} + +func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error { + if _, err := os.Stat(path); err != nil { + return fmt.Errorf("check metabase existance: %w", err) + } + db, err := bbolt.Open(path, os.ModePerm, bbolt.DefaultOptions) + if err != nil { + return fmt.Errorf("open metabase: %w", err) + } + var version uint64 + if err := db.View(func(tx *bbolt.Tx) error { + var e error + version, e = currentVersion(tx) + return e + }); err != nil { + return err + } + updater, found := updates[version] + if !found { + return fmt.Errorf("unsupported version %d: no update available", version) + } + if err := updater(ctx, db, log); err != nil { + return fmt.Errorf("update metabase schema: %w", err) + } + if compact { + log("compacting metabase...") + err := compactDB(db) + if err != nil { + return fmt.Errorf("compact metabase: %w", err) + } + log("metabase compacted") + } + return db.Close() +} + +func compactDB(db *bbolt.DB) error { + sourcePath := db.Path() + tmpFileName := sourcePath + "." + time.Now().Format(time.RFC3339) + f, err := os.Stat(sourcePath) + if err != nil { + return err + } + dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{ + Timeout: 100 * time.Millisecond, + }) + if err != nil { + return fmt.Errorf("can't open new metabase to compact: %w", err) + } + if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil { + return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName))) + } + if err := dst.Close(); err != nil { + return fmt.Errorf("close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName))) + } + if err := db.Close(); err != nil { + return fmt.Errorf("close source metabase: %w", errors.Join(err, os.Remove(tmpFileName))) + } + if err := os.Rename(tmpFileName, sourcePath); err != nil { + return fmt.Errorf("replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName))) + } + return nil +} + +func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { + if err := createExpirationEpochBuckets(ctx, db, log); err != nil { + return err + } + if err := dropUserAttributes(ctx, db, log); err != nil { + return err + } + if err := dropOwnerIDIndex(ctx, db, log); err != nil { + return err + } + if err := dropPayloadChecksumIndex(ctx, db, log); err != nil { + return err + } + return db.Update(func(tx *bbolt.Tx) error { + return updateVersion(tx, version) + }) +} + +type objectIDToExpEpoch struct { + containerID cid.ID + objectID oid.ID + expirationEpoch uint64 +} + +func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { + log("filling expiration epoch buckets...") + if err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName) + return err + }); err != nil { + return err + } + objects := make(chan objectIDToExpEpoch) + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return selectObjectsWithExpirationEpoch(ctx, db, objects) + }) + var count atomic.Uint64 + for i := 0; i < workersCount; i++ { + eg.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case obj, ok := <-objects: + if !ok { + return nil + } + if err := db.Batch(func(tx *bbolt.Tx) error { + if err := putUniqueIndexItem(tx, namedBucketItem{ + name: expEpochToObjectBucketName, + key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID), + val: zeroValue, + }); err != nil { + return err + } + val := make([]byte, epochSize) + binary.LittleEndian.PutUint64(val, obj.expirationEpoch) + return putUniqueIndexItem(tx, namedBucketItem{ + name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)), + key: objectKey(obj.objectID, make([]byte, objectKeySize)), + val: val, + }) + }); err != nil { + return err + } + } + if c := count.Add(1); c%logGrequency == 0 { + log("expiration epoch filled for", c, "objects...") + } + } + }) + } + err := eg.Wait() + if err != nil { + log("expiration epoch buckets completed completed with error:", err) + return err + } + log("filling expiration epoch buckets completed successfully, total", count.Load(), "objects") + return nil +} + +func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error { + defer close(objects) + + const batchSize = 1000 + it := &objectsWithExpirationEpochBatchIterator{ + lastAttributeKey: usrAttrPrefix, + } + for { + if err := getNextObjectsWithExpirationEpochBatch(ctx, db, it, batchSize); err != nil { + return err + } + for _, item := range it.items { + select { + case <-ctx.Done(): + return ctx.Err() + case objects <- item: + } + } + + if len(it.items) < batchSize { + return nil + } + it.items = nil + } +} + +var ( + usrAttrPrefix = []byte{userAttributePrefix} + errBatchSizeLimit = errors.New("batch size limit") +) + +type objectsWithExpirationEpochBatchIterator struct { + lastAttributeKey []byte + lastAttributeValue []byte + lastAttrKeyValueItem []byte + items []objectIDToExpEpoch +} + +// - {prefix}{containerID}{attributeKey} <- bucket +// -- {attributeValue} <- bucket, expirationEpoch +// --- {objectID}: zeroValue <- record + +func getNextObjectsWithExpirationEpochBatch(ctx context.Context, db *bbolt.DB, it *objectsWithExpirationEpochBatchIterator, batchSize int) error { + seekAttrValue := it.lastAttributeValue + seekAttrKVItem := it.lastAttrKeyValueItem + err := db.View(func(tx *bbolt.Tx) error { + attrKeyC := tx.Cursor() + for attrKey, _ := attrKeyC.Seek(it.lastAttributeKey); attrKey != nil && bytes.HasPrefix(attrKey, usrAttrPrefix); attrKey, _ = attrKeyC.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if len(attrKey) <= 1+cidSize { + continue + } + attributeKey := string(attrKey[1+cidSize:]) + if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch { + continue + } + var containerID cid.ID + if err := containerID.Decode(attrKey[1 : 1+cidSize]); err != nil { + return fmt.Errorf("decode container id from user attribute bucket: %w", err) + } + if err := iterateExpirationAttributeKeyBucket(ctx, tx.Bucket(attrKey), it, batchSize, containerID, attrKey, seekAttrValue, seekAttrKVItem); err != nil { + return err + } + seekAttrValue = nil + seekAttrKVItem = nil + } + return nil + }) + if err != nil && !errors.Is(err, errBatchSizeLimit) { + return err + } + return nil +} + +func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, it *objectsWithExpirationEpochBatchIterator, batchSize int, containerID cid.ID, attrKey, seekAttrValue, seekAttrKVItem []byte) error { + attrValueC := b.Cursor() + for attrValue, v := attrValueC.Seek(seekAttrValue); attrValue != nil; attrValue, v = attrValueC.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if v != nil { + continue // need to iterate over buckets, not records + } + expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64) + if err != nil { + return fmt.Errorf("could not parse expiration epoch: %w", err) + } + expirationEpochBucket := b.Bucket(attrValue) + attrKeyValueC := expirationEpochBucket.Cursor() + for attrKeyValueItem, v := attrKeyValueC.Seek(seekAttrKVItem); attrKeyValueItem != nil; attrKeyValueItem, v = attrKeyValueC.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if v == nil { + continue // need to iterate over records, not buckets + } + if bytes.Equal(it.lastAttributeKey, attrKey) && bytes.Equal(it.lastAttributeValue, attrValue) && bytes.Equal(it.lastAttrKeyValueItem, attrKeyValueItem) { + continue + } + var objectID oid.ID + if err := objectID.Decode(attrKeyValueItem); err != nil { + return fmt.Errorf("decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err) + } + it.lastAttributeKey = bytes.Clone(attrKey) + it.lastAttributeValue = bytes.Clone(attrValue) + it.lastAttrKeyValueItem = bytes.Clone(attrKeyValueItem) + it.items = append(it.items, objectIDToExpEpoch{ + containerID: containerID, + objectID: objectID, + expirationEpoch: expirationEpoch, + }) + if len(it.items) == batchSize { + return errBatchSizeLimit + } + } + seekAttrKVItem = nil + } + return nil +} + +func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { + return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(a ...any) { + log(append([]any{"user attributes:"}, a...)...) + }) +} + +func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { + return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) { + log(append([]any{"owner ID index:"}, a...)...) + }) +} + +func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { + return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}, func(a ...any) { + log(append([]any{"payload checksum:"}, a...)...) + }) +} + +func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte, log func(a ...any)) error { + log("deleting buckets...") + const batch = 1000 + var count uint64 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + var keys [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + c := tx.Cursor() + for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() { + keys = append(keys, bytes.Clone(k)) + } + return nil + }); err != nil { + log("deleting buckets completed with an error:", err) + return err + } + if len(keys) == 0 { + log("deleting buckets completed successfully, deleted", count, "buckets") + return nil + } + if err := db.Update(func(tx *bbolt.Tx) error { + for _, k := range keys { + if err := tx.DeleteBucket(k); err != nil { + return err + } + } + return nil + }); err != nil { + log("deleting buckets completed with an error:", err) + return err + } + if count += uint64(len(keys)); count%logGrequency == 0 { + log("deleted", count, "buckets") + } + } +} diff --git a/pkg/local_object_storage/metabase/upgrade_test.go b/pkg/local_object_storage/metabase/upgrade_test.go new file mode 100644 index 000000000..dc3d7d07d --- /dev/null +++ b/pkg/local_object_storage/metabase/upgrade_test.go @@ -0,0 +1,215 @@ +//go:build integration + +package meta + +import ( + "context" + "fmt" + "io" + "os" + "strconv" + "testing" + "time" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +const upgradeFilePath = "/path/to/metabase.v2" + +func TestUpgradeV2ToV3(t *testing.T) { + path := createTempCopy(t, upgradeFilePath) + defer func() { + require.NoError(t, os.Remove(path)) + }() + db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t))) + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + require.ErrorIs(t, db.Init(), ErrOutdatedVersion) + require.NoError(t, db.Close()) + require.NoError(t, Upgrade(context.Background(), path, true, t.Log)) + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + require.NoError(t, db.Init()) + require.NoError(t, db.Close()) + fmt.Println() +} + +func createTempCopy(t *testing.T, path string) string { + src, err := os.Open(path) + require.NoError(t, err) + + tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339) + dest, err := os.Create(tmpPath) + require.NoError(t, err) + + _, err = io.Copy(dest, src) + require.NoError(t, err) + + require.NoError(t, src.Close()) + require.NoError(t, dest.Close()) + + return tmpPath +} + +func TestGenerateMetabaseFile(t *testing.T) { + t.Skip("for generating db") + const ( + containersCount = 10_000 + simpleObjectsCount = 500_000 + complexObjectsCount = 500_000 // x2 + deletedByGCMarksCount = 100_000 + deletedByTombstoneCount = 100_000 // x2 + lockedCount = 100_000 // x2 + + allocSize = 128 << 20 + generateWorkersCount = 1_000 + minEpoch = 1_000 + maxFilename = 1_000 + maxStorageID = 10_000 + ) + + db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: minEpoch}), WithLogger(test.NewLogger(t))) + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + db.boltDB.AllocSize = allocSize + db.boltDB.NoSync = true + require.NoError(t, db.Init()) + containers := make([]cid.ID, containersCount) + for i := range containers { + containers[i] = cidtest.ID() + } + oc, err := db.ObjectCounters() + require.NoError(t, err) + require.True(t, oc.IsZero()) + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(generateWorkersCount) + // simple objects + for i := 0; i < simpleObjectsCount; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(generateWorkersCount) + // complex objects + for i := 0; i < complexObjectsCount; i++ { + i := i + eg.Go(func() error { + parent := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + child := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + child.SetParent(parent) + idParent, _ := parent.ID() + child.SetParentID(idParent) + testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) + testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: child, + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("complex objects generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(generateWorkersCount) + // simple objects deleted by gc marks + for i := 0; i < deletedByGCMarksCount; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), + }) + require.NoError(t, err) + _, err = db.Inhume(ctx, InhumePrm{ + target: []oid.Address{object.AddressOf(obj)}, + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects deleted by gc marks generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(10000) + // simple objects deleted by tombstones + for i := 0; i < deletedByTombstoneCount; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), + }) + tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + tomb.SetType(objectSDK.TypeTombstone) + _, err = db.Put(ctx, PutPrm{ + obj: tomb, + id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), + }) + require.NoError(t, err) + tombAddr := object.AddressOf(tomb) + _, err = db.Inhume(ctx, InhumePrm{ + target: []oid.Address{object.AddressOf(obj)}, + tomb: &tombAddr, + }) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects deleted by tombstones generated") + eg, ctx = errgroup.WithContext(context.Background()) + eg.SetLimit(generateWorkersCount) + // simple objects locked by locks + for i := 0; i < lockedCount; i++ { + i := i + eg.Go(func() error { + obj := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%maxFilename), 10)) + _, err := db.Put(ctx, PutPrm{ + obj: obj, + id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), + }) + lock := testutil.GenerateObjectWithCID(containers[i%len(containers)]) + lock.SetType(objectSDK.TypeLock) + testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%minEpoch+minEpoch), 10)) + _, err = db.Put(ctx, PutPrm{ + obj: lock, + id: []byte(strconv.FormatInt(int64(i%maxStorageID), 10) + "/" + strconv.FormatInt(int64(i%maxStorageID), 10)), + }) + require.NoError(t, err) + err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()}) + require.NoError(t, err) + return nil + }) + } + require.NoError(t, eg.Wait()) + db.log.Info("simple objects locked by locks generated") + require.NoError(t, db.boltDB.Sync()) + require.NoError(t, db.Close()) +} diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 9134616fe..eef7210dc 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -94,11 +94,13 @@ const ( // ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs. // Key: owner ID // Value: bucket containing object IDs as keys - _ + // removed in version 3 + ownerPrefix // userAttributePrefix was used for prefixing FKBT index buckets containing objects. // Key: attribute value // Value: bucket containing object IDs as keys - _ + // removed in version 3 + userAttributePrefix // ==================== // List index buckets. @@ -107,7 +109,8 @@ const ( // payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs. // Key: payload hash // Value: list of object IDs - _ + // removed in version 3 + payloadHashPrefix // parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs. // Key: parent ID // Value: list of object IDs diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index bb2b66d9b..9e15babbc 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -2,6 +2,7 @@ package meta import ( "encoding/binary" + "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -18,6 +19,8 @@ var versionKey = []byte("version") // the current code version. var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required") +var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket") + func checkVersion(tx *bbolt.Tx, initialized bool) error { var knownVersion bool @@ -59,3 +62,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error { } return b.Put(versionKey, data) } + +func currentVersion(tx *bbolt.Tx) (uint64, error) { + b := tx.Bucket(shardInfoBucket) + if b == nil { + return 0, errVersionUndefinedNoInfoBucket + } + data := b.Get(versionKey) + if len(data) != 8 { + return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data)) + } + return binary.LittleEndian.Uint64(data), nil +}