metabase: Fix drop buckets during resync #1050
2 changed files with 82 additions and 21 deletions
|
@ -21,6 +21,22 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
|
||||||
// ErrReadOnlyMode is returned when metabase is in a read-only mode.
|
// ErrReadOnlyMode is returned when metabase is in a read-only mode.
|
||||||
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
||||||
|
|
||||||
|
var (
|
||||||
|
mStaticBuckets = map[string]struct{}{
|
||||||
|
string(containerVolumeBucketName): {},
|
||||||
|
string(containerCounterBucketName): {},
|
||||||
|
string(graveyardBucketName): {},
|
||||||
|
string(garbageBucketName): {},
|
||||||
|
string(shardInfoBucket): {},
|
||||||
|
string(bucketNameLocked): {},
|
||||||
|
}
|
||||||
|
|
||||||
|
// deprecatedBuckets buckets that are not used anymore.
|
||||||
|
deprecatedBuckets = [][]byte{
|
||||||
|
toMoveItBucketName,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// Open boltDB instance for metabase.
|
// Open boltDB instance for metabase.
|
||||||
func (db *DB) Open(_ context.Context, mode mode.Mode) error {
|
func (db *DB) Open(_ context.Context, mode mode.Mode) error {
|
||||||
db.modeMtx.Lock()
|
db.modeMtx.Lock()
|
||||||
|
@ -113,20 +129,6 @@ func (db *DB) init(reset bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
mStaticBuckets := map[string]struct{}{
|
|
||||||
string(containerVolumeBucketName): {},
|
|
||||||
string(containerCounterBucketName): {},
|
|
||||||
string(graveyardBucketName): {},
|
|
||||||
string(garbageBucketName): {},
|
|
||||||
string(shardInfoBucket): {},
|
|
||||||
string(bucketNameLocked): {},
|
|
||||||
}
|
|
||||||
|
|
||||||
// buckets that are not used anymore
|
|
||||||
deprecatedBuckets := [][]byte{
|
|
||||||
toMoveItBucketName,
|
|
||||||
}
|
|
||||||
|
|
||||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
var err error
|
var err error
|
||||||
if !reset {
|
if !reset {
|
||||||
|
@ -167,15 +169,17 @@ func (db *DB) init(reset bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tx.ForEach(func(name []byte, _ *bbolt.Bucket) error {
|
bucketCursor := tx.Cursor()
|
||||||
|
name, _ := bucketCursor.First()
|
||||||
|
for name != nil {
|
||||||
if _, ok := mStaticBuckets[string(name)]; !ok {
|
if _, ok := mStaticBuckets[string(name)]; !ok {
|
||||||
return tx.DeleteBucket(name)
|
if err := tx.DeleteBucket(name); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
name, _ = bucketCursor.Seek(name)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
name, _ = bucketCursor.Next()
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return updateVersion(tx, version)
|
return updateVersion(tx, version)
|
||||||
})
|
})
|
||||||
|
|
57
pkg/local_object_storage/metabase/reset_test.go
Normal file
57
pkg/local_object_storage/metabase/reset_test.go
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type epochState struct{ e uint64 }
|
||||||
|
|
||||||
|
func (s epochState) CurrentEpoch() uint64 {
|
||||||
|
return s.e
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResetDropsContainerBuckets(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
db := New(
|
||||||
|
[]Option{
|
||||||
|
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||||
|
WithPermissions(0o600),
|
||||||
|
WithEpochState(epochState{}),
|
||||||
|
}...,
|
||||||
|
)
|
||||||
|
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
require.NoError(t, db.Init())
|
||||||
|
|
||||||
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
|
||||||
|
for idx := 0; idx < 100; idx++ {
|
||||||
|
var putPrm PutPrm
|
||||||
|
putPrm.SetObject(testutil.GenerateObject())
|
||||||
|
putPrm.SetStorageID([]byte(fmt.Sprintf("0/%d", idx)))
|
||||||
|
_, err := db.Put(context.Background(), putPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, db.Reset())
|
||||||
|
|
||||||
|
var bucketCount int
|
||||||
|
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||||
|
_, exists := mStaticBuckets[string(name)]
|
||||||
|
require.True(t, exists, "unexpected bucket:"+string(name))
|
||||||
|
bucketCount++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
require.Equal(t, len(mStaticBuckets), bucketCount)
|
||||||
|
}
|
Loading…
Reference in a new issue