[#9999] metabase: Drop indexes
Some checks failed
DCO action / DCO (pull_request) Successful in 1m29s
Build / Build Components (1.21) (pull_request) Successful in 3m33s
Vulncheck / Vulncheck (pull_request) Failing after 3m10s
Build / Build Components (1.20) (pull_request) Successful in 4m2s
Tests and linters / Lint (pull_request) Failing after 4m3s
Tests and linters / Staticcheck (pull_request) Successful in 4m14s
Tests and linters / Tests (1.20) (pull_request) Successful in 10m7s
Tests and linters / Tests (1.21) (pull_request) Successful in 10m31s
Tests and linters / Tests with -race (pull_request) Successful in 11m2s
Some checks failed
DCO action / DCO (pull_request) Successful in 1m29s
Build / Build Components (1.21) (pull_request) Successful in 3m33s
Vulncheck / Vulncheck (pull_request) Failing after 3m10s
Build / Build Components (1.20) (pull_request) Successful in 4m2s
Tests and linters / Lint (pull_request) Failing after 4m3s
Tests and linters / Staticcheck (pull_request) Successful in 4m14s
Tests and linters / Tests (1.20) (pull_request) Successful in 10m7s
Tests and linters / Tests (1.21) (pull_request) Successful in 10m31s
Tests and linters / Tests with -race (pull_request) Successful in 11m2s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
225f7ea488
commit
5d21aeede4
5 changed files with 110 additions and 0 deletions
|
@ -112,6 +112,7 @@ type shardCfg struct {
|
|||
uncompressableContentType []string
|
||||
refillMetabase bool
|
||||
refillMetabaseWorkersCount int
|
||||
dropIndexes bool
|
||||
mode shardmode.Mode
|
||||
|
||||
metaCfg struct {
|
||||
|
@ -225,6 +226,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
|||
|
||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||
newConfig.dropIndexes = oldConfig.DropIndexes()
|
||||
newConfig.mode = oldConfig.Mode()
|
||||
newConfig.compress = oldConfig.Compress()
|
||||
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
|
||||
|
@ -876,6 +878,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
|||
sh.shOpts = []shard.Option{
|
||||
shard.WithLogger(c.log),
|
||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||
shard.WithDropIndexes(shCfg.dropIndexes),
|
||||
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||
shard.WithMode(shCfg.mode),
|
||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||
|
|
|
@ -126,6 +126,13 @@ func (x *Config) RefillMetabaseWorkersCount() int {
|
|||
return RefillMetabaseWorkersCountDefault
|
||||
}
|
||||
|
||||
func (x *Config) DropIndexes() bool {
|
||||
return config.BoolSafe(
|
||||
(*config.Config)(x),
|
||||
"drop_metabase_indexes",
|
||||
)
|
||||
}
|
||||
|
||||
// Mode return the value of "mode" config parameter.
|
||||
//
|
||||
// Panics if read the value is not one of predefined
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -220,3 +222,73 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
|
|||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (db *DB) DropIndexes(ctx context.Context) error {
|
||||
if db.mode.NoMetabase() || db.mode.ReadOnly() {
|
||||
return nil
|
||||
}
|
||||
|
||||
mStaticBuckets := map[string]struct{}{
|
||||
string(containerVolumeBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(toMoveItBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
bucketCursor := tx.Cursor()
|
||||
bucket_name, _ := bucketCursor.First()
|
||||
for bucket_name != nil {
|
||||
if _, ok := mStaticBuckets[string(bucket_name)]; ok {
|
||||
bucket_name, _ = bucketCursor.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
if bucket_name[0] == payloadHashPrefix ||
|
||||
bucket_name[0] == ownerPrefix {
|
||||
name_copy := bytes.Clone(bucket_name)
|
||||
if err := tx.DeleteBucket(name_copy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bucket_name, _ = bucketCursor.Seek(name_copy)
|
||||
continue
|
||||
}
|
||||
|
||||
if bucket_name[0] == userAttributePrefix {
|
||||
attributeKey := string(bucket_name[bucketKeySize:])
|
||||
if attributeKey != objectV2.SysAttributeExpEpoch && attributeKey != "S3-Access-Box-CRDT-Name" {
|
||||
name_copy := bytes.Clone(bucket_name)
|
||||
if err := tx.DeleteBucket(name_copy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bucket_name, _ = bucketCursor.Seek(name_copy)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if bucket_name[0] == rootPrefix {
|
||||
b := tx.Bucket(bucket_name)
|
||||
cursor := b.Cursor()
|
||||
k, v := cursor.First()
|
||||
for k != nil {
|
||||
if len(v) == 0 { // empty split info
|
||||
k_copy := bytes.Clone(k)
|
||||
if err := b.Delete(k_copy); err != nil {
|
||||
return err
|
||||
}
|
||||
k, v = cursor.Seek(k_copy)
|
||||
} else {
|
||||
k, v = cursor.Next()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bucket_name, _ = bucketCursor.Next()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -91,6 +91,21 @@ func (x *metabaseSynchronizer) Init() error {
|
|||
return (*Shard)(x).refillMetabase(ctx)
|
||||
}
|
||||
|
||||
type metabaseDropIndex Shard
|
||||
|
||||
func (x *metabaseDropIndex) Init() error {
|
||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "metabaseDropIndex.Init")
|
||||
defer span.End()
|
||||
|
||||
sh := (*Shard)(x)
|
||||
x.log.Info("Dropping metabase indexes...", zap.Stringer("shard_id", sh.info.ID))
|
||||
if err := sh.metaBase.DropIndexes(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
x.log.Info("Dropping metabase indexes completed", zap.Stringer("shard_id", sh.info.ID))
|
||||
return sh.metaBase.Init()
|
||||
}
|
||||
|
||||
// Init initializes all Shard's components.
|
||||
func (s *Shard) Init(ctx context.Context) error {
|
||||
type initializer interface {
|
||||
|
@ -104,6 +119,8 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
|
||||
if s.NeedRefillMetabase() {
|
||||
initMetabase = (*metabaseSynchronizer)(s)
|
||||
} else if s.DropIndexes() {
|
||||
initMetabase = (*metabaseDropIndex)(s)
|
||||
} else {
|
||||
initMetabase = s.metaBase
|
||||
}
|
||||
|
|
|
@ -94,6 +94,7 @@ type cfg struct {
|
|||
|
||||
refillMetabase bool
|
||||
refillMetabaseWorkersCount int
|
||||
dropIndexes bool
|
||||
|
||||
rmBatchSize int
|
||||
|
||||
|
@ -262,6 +263,10 @@ func (s *Shard) NeedRefillMetabase() bool {
|
|||
return s.cfg.refillMetabase
|
||||
}
|
||||
|
||||
func (s *Shard) DropIndexes() bool {
|
||||
return s.cfg.dropIndexes
|
||||
}
|
||||
|
||||
// WithRemoverBatchSize returns option to set batch size
|
||||
// of single removal operation.
|
||||
func WithRemoverBatchSize(sz int) Option {
|
||||
|
@ -316,6 +321,12 @@ func WithRefillMetabaseWorkersCount(v int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithDropIndexes(v bool) Option {
|
||||
return func(c *cfg) {
|
||||
c.dropIndexes = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
||||
// - mode.ReadWrite;
|
||||
// - mode.ReadOnly.
|
||||
|
|
Loading…
Reference in a new issue