diff --git a/pkg/local_object_storage/metabase/upgrade.go b/pkg/local_object_storage/metabase/upgrade.go new file mode 100644 index 000000000..715ef22cf --- /dev/null +++ b/pkg/local_object_storage/metabase/upgrade.go @@ -0,0 +1,248 @@ +package meta + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "os" + "strconv" + "time" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" + "golang.org/x/sync/errgroup" +) + +var updates = map[uint64]func(ctx context.Context, db *bbolt.DB) error{ + 2: upgradeFromV2ToV3, +} + +var errFailedToUpgradeDatabaseNotOpen = errors.New("failed to upgrade metabase: database not open") + +func (db *DB) Upgrade(ctx context.Context, compact bool) error { + db.modeMtx.Lock() + defer db.modeMtx.Unlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } else if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + if db.boltDB == nil { + return errFailedToUpgradeDatabaseNotOpen + } + + var version uint64 + if err := db.boltDB.View(func(tx *bbolt.Tx) error { + var e error + version, e = currentVersion(tx) + return e + }); err != nil { + return err + } + updater, found := updates[version] + if !found { + return fmt.Errorf("unsupported version %d: no update available", version) + } + if err := updater(ctx, db.boltDB); err != nil { + return fmt.Errorf("failed to update metabase schema: %w", err) + } + if compact { + err := db.compact() + if err != nil { + return fmt.Errorf("failed to compact metabase: %w", err) + } + } + return nil +} + +func (db *DB) compact() error { + tmpFileName := db.info.Path + "." + time.Now().Format(time.RFC3339) + dst, err := bbolt.Open(tmpFileName, db.info.Permission, db.boltOptions) + if err != nil { + return fmt.Errorf("can't open new metabase to compact: %w", err) + } + if err := bbolt.Compact(dst, db.boltDB, 256<<20); err != nil { + return fmt.Errorf("failed to compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName))) + } + if err := dst.Close(); err != nil { + return fmt.Errorf("failed to close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName))) + } + if err := db.boltDB.Close(); err != nil { + return fmt.Errorf("failed to close source metabase: %w", errors.Join(err, os.Remove(tmpFileName))) + } + db.boltDB = nil + db.initialized = false + if err := os.Rename(tmpFileName, db.info.Path); err != nil { + return fmt.Errorf("failed to replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName))) + } + return db.openBolt() +} + +func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB) error { + if err := createExpirationEpochBuckets(ctx, db); err != nil { + return err + } + if err := dropUserAttributes(ctx, db); err != nil { + return err + } + if err := dropOwnerIDIndex(ctx, db); err != nil { + return err + } + return dropPayloadChecksumIndex(ctx, db) +} + +type objectIDToExpEpoch struct { + containerID cid.ID + objectID oid.ID + expirationEpoch uint64 +} + +func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB) error { + if err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName) + return err + }); err != nil { + return err + } + + objects := make(chan objectIDToExpEpoch, 1000) + defer func() { + close(objects) + }() + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + prefix := []byte{userAttributePrefix} + return db.View(func(tx *bbolt.Tx) error { + userAttrCursor := tx.Cursor() + for userAttrKey, _ := userAttrCursor.Seek(prefix); userAttrKey != nil && bytes.HasPrefix(userAttrKey, prefix); userAttrKey, _ = userAttrCursor.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(userAttrKey) <= 1+cidSize { + continue + } + attributeKey := string(userAttrKey[1+cidSize:]) + if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch { + continue + } + + var containerID cid.ID + if err := containerID.Decode(userAttrKey[1 : 1+cidSize]); err != nil { + return fmt.Errorf("failed to decode container id from user attribute bucket: %w", err) + } + + b := tx.Bucket(userAttrKey) + return b.ForEachBucket(func(expKey []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + expirationEpoch, err := strconv.ParseUint(string(expKey), 10, 64) + if err != nil { + return fmt.Errorf("could not parse expiration epoch: %w", err) + } + expirationEpochBucket := b.Bucket(expKey) + return expirationEpochBucket.ForEach(func(k, _ []byte) error { + var objectID oid.ID + if err := objectID.Decode(k); err != nil { + return fmt.Errorf("failed to decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case objects <- objectIDToExpEpoch{ + containerID: containerID, + objectID: objectID, + expirationEpoch: expirationEpoch, + }: + return nil + } + }) + }) + } + return nil + }) + }) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case obj := <-objects: + return db.Update(func(tx *bbolt.Tx) error { + if err := putUniqueIndexItem(tx, namedBucketItem{ + name: expEpochToObjectBucketName, + key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID), + val: zeroValue, + }); err != nil { + return err + } + val := make([]byte, epochSize) + binary.LittleEndian.PutUint64(val, obj.expirationEpoch) + return putUniqueIndexItem(tx, namedBucketItem{ + name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)), + key: objectKey(obj.objectID, make([]byte, objectKeySize)), + val: val, + }) + }) + } + }) + return eg.Wait() +} + +func dropUserAttributes(ctx context.Context, db *bbolt.DB) error { + return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}) +} + +func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB) error { + return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}) +} + +func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB) error { + return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}) +} + +func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte) error { + const batch = 100 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + var keys [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + c := tx.Cursor() + for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() { + keys = append(keys, bytes.Clone(k)) + } + return nil + }); err != nil { + return err + } + if len(keys) == 0 { + return nil + } + if err := db.Update(func(tx *bbolt.Tx) error { + for _, k := range keys { + if err := tx.DeleteBucket(k); err != nil { + return err + } + } + return nil + }); err != nil { + return err + } + } +} diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 9134616fe..eef7210dc 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -94,11 +94,13 @@ const ( // ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs. // Key: owner ID // Value: bucket containing object IDs as keys - _ + // removed in version 3 + ownerPrefix // userAttributePrefix was used for prefixing FKBT index buckets containing objects. // Key: attribute value // Value: bucket containing object IDs as keys - _ + // removed in version 3 + userAttributePrefix // ==================== // List index buckets. @@ -107,7 +109,8 @@ const ( // payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs. // Key: payload hash // Value: list of object IDs - _ + // removed in version 3 + payloadHashPrefix // parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs. // Key: parent ID // Value: list of object IDs diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index bb2b66d9b..523c95fd2 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -59,3 +59,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error { } return b.Put(versionKey, data) } + +func currentVersion(tx *bbolt.Tx) (uint64, error) { + b := tx.Bucket(shardInfoBucket) + if b == nil { + return 0, fmt.Errorf("version undefined: no info bucket") + } + data := b.Get(versionKey) + if len(data) != 8 { + return 0, fmt.Errorf("version undefined: invalid version data lenght %d", len(data)) + } + return binary.LittleEndian.Uint64(data), nil +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 936a506c0..68d2362f2 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -172,7 +172,13 @@ func (s *Shard) initializeComponents(m mode.Mode) error { if err := component.Init(); err != nil { if component == s.metaBase { if errors.Is(err, meta.ErrOutdatedVersion) { - return fmt.Errorf("metabase initialization: %w", err) + err = s.metaBase.Upgrade(context.TODO(), true) // TODO replace with config variable + if err == nil { + err = s.metaBase.Init() + } + } + if err == nil { + continue } err = s.handleMetabaseFailure("init", err)