[#1334] metabase: Add upgrade from v2 to v3
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 1m45s
DCO action / DCO (pull_request) Successful in 1m45s
Vulncheck / Vulncheck (pull_request) Successful in 2m17s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m50s
Build / Build Components (1.22) (pull_request) Successful in 2m55s
Build / Build Components (1.23) (pull_request) Successful in 2m54s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m7s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m10s
Tests and linters / Lint (pull_request) Successful in 3m34s
Tests and linters / Staticcheck (pull_request) Successful in 3m26s
Tests and linters / gopls check (pull_request) Successful in 3m36s
Tests and linters / Tests with -race (pull_request) Successful in 4m14s
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 1m45s
DCO action / DCO (pull_request) Successful in 1m45s
Vulncheck / Vulncheck (pull_request) Successful in 2m17s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m50s
Build / Build Components (1.22) (pull_request) Successful in 2m55s
Build / Build Components (1.23) (pull_request) Successful in 2m54s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m7s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m10s
Tests and linters / Lint (pull_request) Successful in 3m34s
Tests and linters / Staticcheck (pull_request) Successful in 3m26s
Tests and linters / gopls check (pull_request) Successful in 3m36s
Tests and linters / Tests with -race (pull_request) Successful in 4m14s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
5e9a97fd3e
commit
7a36729dcd
13 changed files with 525 additions and 11 deletions
|
@ -121,11 +121,12 @@ type shardCfg struct {
|
||||||
estimateCompressibility bool
|
estimateCompressibility bool
|
||||||
estimateCompressibilityThreshold float64
|
estimateCompressibilityThreshold float64
|
||||||
|
|
||||||
smallSizeObjectLimit uint64
|
smallSizeObjectLimit uint64
|
||||||
uncompressableContentType []string
|
uncompressableContentType []string
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
refillMetabaseWorkersCount int
|
refillMetabaseWorkersCount int
|
||||||
mode shardmode.Mode
|
mode shardmode.Mode
|
||||||
|
skipMetabaseCompactOnUpgrade bool
|
||||||
|
|
||||||
metaCfg struct {
|
metaCfg struct {
|
||||||
path string
|
path string
|
||||||
|
@ -240,6 +241,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
|
|
||||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||||
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||||
|
newConfig.skipMetabaseCompactOnUpgrade = oldConfig.SkipMetabaseCompactOnUpgrade()
|
||||||
newConfig.mode = oldConfig.Mode()
|
newConfig.mode = oldConfig.Mode()
|
||||||
newConfig.compress = oldConfig.Compress()
|
newConfig.compress = oldConfig.Compress()
|
||||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||||
|
@ -998,6 +1000,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||||
|
shard.WithSkipMetabaseCompactOnUpgrade(shCfg.skipMetabaseCompactOnUpgrade),
|
||||||
shard.WithMode(shCfg.mode),
|
shard.WithMode(shCfg.mode),
|
||||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||||
shard.WithMetaBaseOptions(mbOptions...),
|
shard.WithMetaBaseOptions(mbOptions...),
|
||||||
|
|
|
@ -121,6 +121,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.Equal(t, false, sc.RefillMetabase())
|
require.Equal(t, false, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||||
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||||
|
require.False(t, sc.SkipMetabaseCompactOnUpgrade())
|
||||||
case 1:
|
case 1:
|
||||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||||
|
@ -176,6 +177,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.Equal(t, true, sc.RefillMetabase())
|
require.Equal(t, true, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||||
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||||
|
require.True(t, sc.SkipMetabaseCompactOnUpgrade())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -149,6 +149,16 @@ func (x *Config) RefillMetabaseWorkersCount() int {
|
||||||
return RefillMetabaseWorkersCountDefault
|
return RefillMetabaseWorkersCountDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SkipMetabaseCompactOnUpgrade returns the value of "skip_metabase_compact_on_upgrade" config parameter.
|
||||||
|
//
|
||||||
|
// Returns False if the value is not valid or not defined.
|
||||||
|
func (x *Config) SkipMetabaseCompactOnUpgrade() bool {
|
||||||
|
return config.BoolSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"skip_metabase_compact_on_upgrade",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Mode return the value of "mode" config parameter.
|
// Mode return the value of "mode" config parameter.
|
||||||
//
|
//
|
||||||
// Panics if read the value is not one of predefined
|
// Panics if read the value is not one of predefined
|
||||||
|
|
|
@ -152,6 +152,7 @@ FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15
|
||||||
## 1 shard
|
## 1 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
FROSTFS_STORAGE_SHARD_1_RESYNC_METABASE=true
|
FROSTFS_STORAGE_SHARD_1_RESYNC_METABASE=true
|
||||||
|
FROSTFS_STORAGE_SHARD_1_SKIP_METABASE_COMPACT_ON_UPGRADE=true
|
||||||
### Flag to set shard mode
|
### Flag to set shard mode
|
||||||
FROSTFS_STORAGE_SHARD_1_MODE=read-write
|
FROSTFS_STORAGE_SHARD_1_MODE=read-write
|
||||||
### Write cache config
|
### Write cache config
|
||||||
|
|
|
@ -201,6 +201,7 @@
|
||||||
"1": {
|
"1": {
|
||||||
"mode": "read-write",
|
"mode": "read-write",
|
||||||
"resync_metabase": true,
|
"resync_metabase": true,
|
||||||
|
"skip_metabase_compact_on_upgrade": true,
|
||||||
"writecache": {
|
"writecache": {
|
||||||
"enabled": true,
|
"enabled": true,
|
||||||
"path": "tmp/1/cache",
|
"path": "tmp/1/cache",
|
||||||
|
|
|
@ -208,6 +208,7 @@ storage:
|
||||||
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
|
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
|
||||||
|
|
||||||
1:
|
1:
|
||||||
|
skip_metabase_compact_on_upgrade: true
|
||||||
writecache:
|
writecache:
|
||||||
path: tmp/1/cache # write-cache root directory
|
path: tmp/1/cache # write-cache root directory
|
||||||
capacity: 4 G # approximate write-cache total size, bytes
|
capacity: 4 G # approximate write-cache total size, bytes
|
||||||
|
|
|
@ -189,6 +189,7 @@ The following table describes configuration for each shard.
|
||||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||||
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
||||||
|
| `skip_metabase_compact_on_upgrade` | `bool` | `false` | If `true` then metabase will not be compacted on upgrade. |
|
||||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||||
|
|
257
pkg/local_object_storage/metabase/upgrade.go
Normal file
257
pkg/local_object_storage/metabase/upgrade.go
Normal file
|
@ -0,0 +1,257 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB) error{
|
||||||
|
2: upgradeFromV2ToV3,
|
||||||
|
}
|
||||||
|
|
||||||
|
var errFailedToUpgradeDatabaseNotOpen = errors.New("failed to upgrade metabase: database not open")
|
||||||
|
|
||||||
|
func (db *DB) Upgrade(ctx context.Context, compact bool) error {
|
||||||
|
db.modeMtx.Lock()
|
||||||
|
defer db.modeMtx.Unlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
} else if db.mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.boltDB == nil {
|
||||||
|
return errFailedToUpgradeDatabaseNotOpen
|
||||||
|
}
|
||||||
|
|
||||||
|
var version uint64
|
||||||
|
if err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
var e error
|
||||||
|
version, e = currentVersion(tx)
|
||||||
|
return e
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
updater, found := updates[version]
|
||||||
|
if !found {
|
||||||
|
return fmt.Errorf("unsupported version %d: no update available", version)
|
||||||
|
}
|
||||||
|
if err := updater(ctx, db.boltDB); err != nil {
|
||||||
|
return fmt.Errorf("failed to update metabase schema: %w", err)
|
||||||
|
}
|
||||||
|
if compact {
|
||||||
|
err := db.compact()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to compact metabase: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) compact() error {
|
||||||
|
tmpFileName := db.info.Path + "." + time.Now().Format(time.RFC3339)
|
||||||
|
dst, err := bbolt.Open(tmpFileName, db.info.Permission, db.boltOptions)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
||||||
|
}
|
||||||
|
if err := bbolt.Compact(dst, db.boltDB, 256<<20); err != nil {
|
||||||
|
return fmt.Errorf("failed to compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
if err := dst.Close(); err != nil {
|
||||||
|
return fmt.Errorf("failed to close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
if err := db.boltDB.Close(); err != nil {
|
||||||
|
return fmt.Errorf("failed to close source metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
db.boltDB = nil
|
||||||
|
db.initialized = false
|
||||||
|
if err := os.Rename(tmpFileName, db.info.Path); err != nil {
|
||||||
|
return fmt.Errorf("failed to replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName)))
|
||||||
|
}
|
||||||
|
return db.openBolt()
|
||||||
|
}
|
||||||
|
|
||||||
|
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB) error {
|
||||||
|
if err := createExpirationEpochBuckets(ctx, db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dropUserAttributes(ctx, db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dropOwnerIDIndex(ctx, db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dropPayloadChecksumIndex(ctx, db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
return updateVersion(tx, version)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectIDToExpEpoch struct {
|
||||||
|
containerID cid.ID
|
||||||
|
objectID oid.ID
|
||||||
|
expirationEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB) error {
|
||||||
|
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
_, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName)
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
objects := make(chan objectIDToExpEpoch, 1000)
|
||||||
|
defer func() {
|
||||||
|
close(objects)
|
||||||
|
}()
|
||||||
|
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
eg.Go(func() error {
|
||||||
|
return selectObjectsWithExpirationEpoch(ctx, db, objects)
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case obj := <-objects:
|
||||||
|
return db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
if err := putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: expEpochToObjectBucketName,
|
||||||
|
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||||
|
val: zeroValue,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
val := make([]byte, epochSize)
|
||||||
|
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||||
|
return putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
|
||||||
|
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return eg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
|
||||||
|
prefix := []byte{userAttributePrefix}
|
||||||
|
return db.View(func(tx *bbolt.Tx) error {
|
||||||
|
userAttrCursor := tx.Cursor()
|
||||||
|
for userAttrKey, _ := userAttrCursor.Seek(prefix); userAttrKey != nil && bytes.HasPrefix(userAttrKey, prefix); userAttrKey, _ = userAttrCursor.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(userAttrKey) <= 1+cidSize {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
attributeKey := string(userAttrKey[1+cidSize:])
|
||||||
|
if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var containerID cid.ID
|
||||||
|
if err := containerID.Decode(userAttrKey[1 : 1+cidSize]); err != nil {
|
||||||
|
return fmt.Errorf("failed to decode container id from user attribute bucket: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b := tx.Bucket(userAttrKey)
|
||||||
|
return b.ForEachBucket(func(expKey []byte) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
expirationEpoch, err := strconv.ParseUint(string(expKey), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
||||||
|
}
|
||||||
|
expirationEpochBucket := b.Bucket(expKey)
|
||||||
|
return expirationEpochBucket.ForEach(func(k, _ []byte) error {
|
||||||
|
var objectID oid.ID
|
||||||
|
if err := objectID.Decode(k); err != nil {
|
||||||
|
return fmt.Errorf("failed to decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case objects <- objectIDToExpEpoch{
|
||||||
|
containerID: containerID,
|
||||||
|
objectID: objectID,
|
||||||
|
expirationEpoch: expirationEpoch,
|
||||||
|
}:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropUserAttributes(ctx context.Context, db *bbolt.DB) error {
|
||||||
|
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB) error {
|
||||||
|
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB) error {
|
||||||
|
return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte) error {
|
||||||
|
const batch = 100
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
var keys [][]byte
|
||||||
|
if err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
c := tx.Cursor()
|
||||||
|
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
|
||||||
|
keys = append(keys, bytes.Clone(k))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
for _, k := range keys {
|
||||||
|
if err := tx.DeleteBucket(k); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
205
pkg/local_object_storage/metabase/upgrade_test.go
Normal file
205
pkg/local_object_storage/metabase/upgrade_test.go
Normal file
|
@ -0,0 +1,205 @@
|
||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const upgradeFilePath = "/path/to/metabase.v2"
|
||||||
|
|
||||||
|
func TestUpgradeV2ToV3(t *testing.T) {
|
||||||
|
path := createTempCopy(t, upgradeFilePath)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.Remove(path))
|
||||||
|
}()
|
||||||
|
db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)))
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
|
||||||
|
require.NoError(t, db.Upgrade(context.Background(), true))
|
||||||
|
require.NoError(t, db.Init())
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTempCopy(t *testing.T, path string) string {
|
||||||
|
src, err := os.Open(path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339)
|
||||||
|
dest, err := os.Create(tmpPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = io.Copy(dest, src)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, src.Close())
|
||||||
|
require.NoError(t, dest.Close())
|
||||||
|
|
||||||
|
return tmpPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenerateMetabaseFile(t *testing.T) {
|
||||||
|
t.Skip("for generating db")
|
||||||
|
// This test generates a metabase file with 2 million objects for 10 000 containers,
|
||||||
|
// of which
|
||||||
|
// 500 000 are simple objects,
|
||||||
|
// 500 000 are complex objects (total 1 million),
|
||||||
|
// 100 000 are deleted by gcMarks,
|
||||||
|
// 100 000 are deleted by tombstones (total 200 000),
|
||||||
|
// 100 000 million are locked (total 200 000).
|
||||||
|
|
||||||
|
db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)),
|
||||||
|
WithMaxBatchDelay(100*time.Millisecond), WithMaxBatchSize(1000))
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
db.boltDB.AllocSize = 128 << 20
|
||||||
|
db.boltDB.NoSync = true
|
||||||
|
require.NoError(t, db.Init())
|
||||||
|
containers := make([]cid.ID, 10_000)
|
||||||
|
for i := range containers {
|
||||||
|
containers[i] = cidtest.ID()
|
||||||
|
}
|
||||||
|
oc, err := db.ObjectCounters()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, oc.IsZero())
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(10000)
|
||||||
|
// simple objects
|
||||||
|
for i := 0; i < 500_000; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(10000)
|
||||||
|
// complex objects
|
||||||
|
for i := 0; i < 500_000; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
parent := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
child := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
child.SetParent(parent)
|
||||||
|
idParent, _ := parent.ID()
|
||||||
|
child.SetParentID(idParent)
|
||||||
|
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%1_000), 10))
|
||||||
|
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%1_000), 10))
|
||||||
|
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
|
||||||
|
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: child,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("complex objects generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(10000)
|
||||||
|
// simple objects deleted by gc marks
|
||||||
|
for i := 0; i < 100_000; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = db.Inhume(ctx, InhumePrm{
|
||||||
|
target: []oid.Address{object.AddressOf(obj)},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects deleted by gc marks generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(10000)
|
||||||
|
// simple objects deleted by tombstones
|
||||||
|
for i := 0; i < 100_000; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
|
||||||
|
})
|
||||||
|
tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
tomb.SetType(objectSDK.TypeTombstone)
|
||||||
|
_, err = db.Put(ctx, PutPrm{
|
||||||
|
obj: tomb,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
tombAddr := object.AddressOf(tomb)
|
||||||
|
_, err = db.Inhume(ctx, InhumePrm{
|
||||||
|
target: []oid.Address{object.AddressOf(obj)},
|
||||||
|
tomb: &tombAddr,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects deleted by tombstones generated")
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(10000)
|
||||||
|
// simple objects locked by locks
|
||||||
|
for i := 0; i < 100_000; i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
|
||||||
|
_, err := db.Put(ctx, PutPrm{
|
||||||
|
obj: obj,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
|
||||||
|
})
|
||||||
|
lock := testutil.GenerateObjectWithCID(containers[i%len(containers)])
|
||||||
|
lock.SetType(objectSDK.TypeLock)
|
||||||
|
testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
|
||||||
|
_, err = db.Put(ctx, PutPrm{
|
||||||
|
obj: lock,
|
||||||
|
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
db.log.Info("simple objects locked by locks generated")
|
||||||
|
require.NoError(t, db.boltDB.Sync())
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}
|
|
@ -94,11 +94,13 @@ const (
|
||||||
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
||||||
// Key: owner ID
|
// Key: owner ID
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
_
|
// removed in version 3
|
||||||
|
ownerPrefix
|
||||||
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||||
// Key: attribute value
|
// Key: attribute value
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
_
|
// removed in version 3
|
||||||
|
userAttributePrefix
|
||||||
|
|
||||||
// ====================
|
// ====================
|
||||||
// List index buckets.
|
// List index buckets.
|
||||||
|
@ -107,7 +109,8 @@ const (
|
||||||
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
||||||
// Key: payload hash
|
// Key: payload hash
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
_
|
// removed in version 3
|
||||||
|
payloadHashPrefix
|
||||||
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
||||||
// Key: parent ID
|
// Key: parent ID
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
@ -18,6 +19,8 @@ var versionKey = []byte("version")
|
||||||
// the current code version.
|
// the current code version.
|
||||||
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
|
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
|
||||||
|
|
||||||
|
var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket")
|
||||||
|
|
||||||
func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
||||||
var knownVersion bool
|
var knownVersion bool
|
||||||
|
|
||||||
|
@ -59,3 +62,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
||||||
}
|
}
|
||||||
return b.Put(versionKey, data)
|
return b.Put(versionKey, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func currentVersion(tx *bbolt.Tx) (uint64, error) {
|
||||||
|
b := tx.Bucket(shardInfoBucket)
|
||||||
|
if b == nil {
|
||||||
|
return 0, errVersionUndefinedNoInfoBucket
|
||||||
|
}
|
||||||
|
data := b.Get(versionKey)
|
||||||
|
if len(data) != 8 {
|
||||||
|
return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data))
|
||||||
|
}
|
||||||
|
return binary.LittleEndian.Uint64(data), nil
|
||||||
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ func (x *metabaseSynchronizer) Init() error {
|
||||||
// Init initializes all Shard's components.
|
// Init initializes all Shard's components.
|
||||||
func (s *Shard) Init(ctx context.Context) error {
|
func (s *Shard) Init(ctx context.Context) error {
|
||||||
m := s.GetMode()
|
m := s.GetMode()
|
||||||
if err := s.initializeComponents(m); err != nil {
|
if err := s.initializeComponents(ctx, m); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) initializeComponents(m mode.Mode) error {
|
func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
|
||||||
type initializer interface {
|
type initializer interface {
|
||||||
Init() error
|
Init() error
|
||||||
}
|
}
|
||||||
|
@ -172,7 +172,13 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
|
||||||
if err := component.Init(); err != nil {
|
if err := component.Init(); err != nil {
|
||||||
if component == s.metaBase {
|
if component == s.metaBase {
|
||||||
if errors.Is(err, meta.ErrOutdatedVersion) {
|
if errors.Is(err, meta.ErrOutdatedVersion) {
|
||||||
return fmt.Errorf("metabase initialization: %w", err)
|
err = s.metaBase.Upgrade(ctx, !s.skipMetabaseCompactOnUpgrade)
|
||||||
|
if err == nil {
|
||||||
|
err = s.metaBase.Init()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.handleMetabaseFailure("init", err)
|
err = s.handleMetabaseFailure("init", err)
|
||||||
|
|
|
@ -105,6 +105,8 @@ type cfg struct {
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
refillMetabaseWorkersCount int
|
refillMetabaseWorkersCount int
|
||||||
|
|
||||||
|
skipMetabaseCompactOnUpgrade bool
|
||||||
|
|
||||||
rmBatchSize int
|
rmBatchSize int
|
||||||
|
|
||||||
useWriteCache bool
|
useWriteCache bool
|
||||||
|
@ -317,6 +319,13 @@ func WithRefillMetabaseWorkersCount(v int) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithSkipMetabaseCompactOnUpgrade returns option to disable metabase compaction on upgrade.
|
||||||
|
func WithSkipMetabaseCompactOnUpgrade(v bool) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.skipMetabaseCompactOnUpgrade = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
||||||
// - mode.ReadWrite;
|
// - mode.ReadWrite;
|
||||||
// - mode.ReadOnly.
|
// - mode.ReadOnly.
|
||||||
|
|
Loading…
Reference in a new issue