[#9999] metabase: Add upgrade from v2 to v3
Some checks failed
DCO action / DCO (pull_request) Successful in 3m30s
Tests and linters / Run gofumpt (pull_request) Successful in 3m22s
Vulncheck / Vulncheck (pull_request) Successful in 3m23s
Build / Build Components (1.22) (pull_request) Successful in 4m17s
Build / Build Components (1.23) (pull_request) Successful in 4m21s
Tests and linters / Lint (pull_request) Failing after 4m15s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m19s
Tests and linters / Tests (1.22) (pull_request) Successful in 4m21s
Tests and linters / Staticcheck (pull_request) Successful in 4m25s
Tests and linters / Tests (1.23) (pull_request) Successful in 4m30s
Tests and linters / gopls check (pull_request) Successful in 4m52s
Tests and linters / Tests with -race (pull_request) Successful in 5m21s
Some checks failed
DCO action / DCO (pull_request) Successful in 3m30s
Tests and linters / Run gofumpt (pull_request) Successful in 3m22s
Vulncheck / Vulncheck (pull_request) Successful in 3m23s
Build / Build Components (1.22) (pull_request) Successful in 4m17s
Build / Build Components (1.23) (pull_request) Successful in 4m21s
Tests and linters / Lint (pull_request) Failing after 4m15s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m19s
Tests and linters / Tests (1.22) (pull_request) Successful in 4m21s
Tests and linters / Staticcheck (pull_request) Successful in 4m25s
Tests and linters / Tests (1.23) (pull_request) Successful in 4m30s
Tests and linters / gopls check (pull_request) Successful in 4m52s
Tests and linters / Tests with -race (pull_request) Successful in 5m21s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
fa7f9fbce2
commit
41e4dbbf14
4 changed files with 273 additions and 4 deletions
248
pkg/local_object_storage/metabase/upgrade.go
Normal file
248
pkg/local_object_storage/metabase/upgrade.go
Normal file
|
@ -0,0 +1,248 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB) error{
|
||||
2: upgradeFromV2ToV3,
|
||||
}
|
||||
|
||||
var errFailedToUpgradeDatabaseNotOpen = errors.New("failed to upgrade metabase: database not open")
|
||||
|
||||
func (db *DB) Upgrade(ctx context.Context, compact bool) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
} else if db.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
if db.boltDB == nil {
|
||||
return errFailedToUpgradeDatabaseNotOpen
|
||||
}
|
||||
|
||||
var version uint64
|
||||
if err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
var e error
|
||||
version, e = currentVersion(tx)
|
||||
return e
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
updater, found := updates[version]
|
||||
if !found {
|
||||
return fmt.Errorf("unsupported version %d: no update available", version)
|
||||
}
|
||||
if err := updater(ctx, db.boltDB); err != nil {
|
||||
return fmt.Errorf("failed to update metabase schema: %w", err)
|
||||
}
|
||||
if compact {
|
||||
err := db.compact()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compact metabase: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) compact() error {
|
||||
tmpFileName := db.info.Path + "." + time.Now().Format(time.RFC3339)
|
||||
dst, err := bbolt.Open(tmpFileName, db.info.Permission, db.boltOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
||||
}
|
||||
if err := bbolt.Compact(dst, db.boltDB, 256<<20); err != nil {
|
||||
return fmt.Errorf("failed to compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||
}
|
||||
if err := dst.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||
}
|
||||
if err := db.boltDB.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close source metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||
}
|
||||
db.boltDB = nil
|
||||
db.initialized = false
|
||||
if err := os.Rename(tmpFileName, db.info.Path); err != nil {
|
||||
return fmt.Errorf("failed to replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||
}
|
||||
return db.openBolt()
|
||||
}
|
||||
|
||||
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB) error {
|
||||
if err := createExpirationEpochBuckets(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropUserAttributes(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropOwnerIDIndex(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
return dropPayloadChecksumIndex(ctx, db)
|
||||
}
|
||||
|
||||
type objectIDToExpEpoch struct {
|
||||
containerID cid.ID
|
||||
objectID oid.ID
|
||||
expirationEpoch uint64
|
||||
}
|
||||
|
||||
func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB) error {
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName)
|
||||
return err
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objects := make(chan objectIDToExpEpoch, 1000)
|
||||
defer func() {
|
||||
close(objects)
|
||||
}()
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
prefix := []byte{userAttributePrefix}
|
||||
return db.View(func(tx *bbolt.Tx) error {
|
||||
userAttrCursor := tx.Cursor()
|
||||
for userAttrKey, _ := userAttrCursor.Seek(prefix); userAttrKey != nil && bytes.HasPrefix(userAttrKey, prefix); userAttrKey, _ = userAttrCursor.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if len(userAttrKey) <= 1+cidSize {
|
||||
continue
|
||||
}
|
||||
attributeKey := string(userAttrKey[1+cidSize:])
|
||||
if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch {
|
||||
continue
|
||||
}
|
||||
|
||||
var containerID cid.ID
|
||||
if err := containerID.Decode(userAttrKey[1 : 1+cidSize]); err != nil {
|
||||
return fmt.Errorf("failed to decode container id from user attribute bucket: %w", err)
|
||||
}
|
||||
|
||||
b := tx.Bucket(userAttrKey)
|
||||
return b.ForEachBucket(func(expKey []byte) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
expirationEpoch, err := strconv.ParseUint(string(expKey), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
||||
}
|
||||
expirationEpochBucket := b.Bucket(expKey)
|
||||
return expirationEpochBucket.ForEach(func(k, _ []byte) error {
|
||||
var objectID oid.ID
|
||||
if err := objectID.Decode(k); err != nil {
|
||||
return fmt.Errorf("failed to decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case objects <- objectIDToExpEpoch{
|
||||
containerID: containerID,
|
||||
objectID: objectID,
|
||||
expirationEpoch: expirationEpoch,
|
||||
}:
|
||||
return nil
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case obj := <-objects:
|
||||
return db.Update(func(tx *bbolt.Tx) error {
|
||||
if err := putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: expEpochToObjectBucketName,
|
||||
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||
val: zeroValue,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
val := make([]byte, epochSize)
|
||||
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
|
||||
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||
val: val,
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func dropUserAttributes(ctx context.Context, db *bbolt.DB) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix})
|
||||
}
|
||||
|
||||
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix})
|
||||
}
|
||||
|
||||
func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix})
|
||||
}
|
||||
|
||||
func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte) error {
|
||||
const batch = 100
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
var keys [][]byte
|
||||
if err := db.View(func(tx *bbolt.Tx) error {
|
||||
c := tx.Cursor()
|
||||
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
|
||||
keys = append(keys, bytes.Clone(k))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||
for _, k := range keys {
|
||||
if err := tx.DeleteBucket(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
|
@ -94,11 +94,13 @@ const (
|
|||
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
||||
// Key: owner ID
|
||||
// Value: bucket containing object IDs as keys
|
||||
_
|
||||
// removed in version 3
|
||||
ownerPrefix
|
||||
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||
// Key: attribute value
|
||||
// Value: bucket containing object IDs as keys
|
||||
_
|
||||
// removed in version 3
|
||||
userAttributePrefix
|
||||
|
||||
// ====================
|
||||
// List index buckets.
|
||||
|
@ -107,7 +109,8 @@ const (
|
|||
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
||||
// Key: payload hash
|
||||
// Value: list of object IDs
|
||||
_
|
||||
// removed in version 3
|
||||
payloadHashPrefix
|
||||
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
||||
// Key: parent ID
|
||||
// Value: list of object IDs
|
||||
|
|
|
@ -59,3 +59,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
|||
}
|
||||
return b.Put(versionKey, data)
|
||||
}
|
||||
|
||||
func currentVersion(tx *bbolt.Tx) (uint64, error) {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return 0, fmt.Errorf("version undefined: no info bucket")
|
||||
}
|
||||
data := b.Get(versionKey)
|
||||
if len(data) != 8 {
|
||||
return 0, fmt.Errorf("version undefined: invalid version data lenght %d", len(data))
|
||||
}
|
||||
return binary.LittleEndian.Uint64(data), nil
|
||||
}
|
||||
|
|
|
@ -172,7 +172,13 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
|
|||
if err := component.Init(); err != nil {
|
||||
if component == s.metaBase {
|
||||
if errors.Is(err, meta.ErrOutdatedVersion) {
|
||||
return fmt.Errorf("metabase initialization: %w", err)
|
||||
err = s.metaBase.Upgrade(context.TODO(), true) // TODO replace with config variable
|
||||
if err == nil {
|
||||
err = s.metaBase.Init()
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = s.handleMetabaseFailure("init", err)
|
||||
|
|
Loading…
Reference in a new issue