metabase: Fix drop buckets during resync #1050
2 changed files with 82 additions and 21 deletions
|
@ -21,6 +21,22 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
|
|||
// ErrReadOnlyMode is returned when metabase is in a read-only mode.
|
||||
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
||||
|
||||
var (
|
||||
mStaticBuckets = map[string]struct{}{
|
||||
string(containerVolumeBucketName): {},
|
||||
string(containerCounterBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
}
|
||||
|
||||
// deprecatedBuckets buckets that are not used anymore.
|
||||
deprecatedBuckets = [][]byte{
|
||||
toMoveItBucketName,
|
||||
}
|
||||
)
|
||||
|
||||
// Open boltDB instance for metabase.
|
||||
func (db *DB) Open(_ context.Context, mode mode.Mode) error {
|
||||
db.modeMtx.Lock()
|
||||
|
@ -113,20 +129,6 @@ func (db *DB) init(reset bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
mStaticBuckets := map[string]struct{}{
|
||||
string(containerVolumeBucketName): {},
|
||||
string(containerCounterBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
}
|
||||
|
||||
// buckets that are not used anymore
|
||||
deprecatedBuckets := [][]byte{
|
||||
toMoveItBucketName,
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
if !reset {
|
||||
|
@ -167,16 +169,18 @@ func (db *DB) init(reset bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
err = tx.ForEach(func(name []byte, _ *bbolt.Bucket) error {
|
||||
bucketCursor := tx.Cursor()
|
||||
name, _ := bucketCursor.First()
|
||||
for name != nil {
|
||||
if _, ok := mStaticBuckets[string(name)]; !ok {
|
||||
return tx.DeleteBucket(name)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if err := tx.DeleteBucket(name); err != nil {
|
||||
return err
|
||||
}
|
||||
name, _ = bucketCursor.Seek(name)
|
||||
continue
|
||||
}
|
||||
name, _ = bucketCursor.Next()
|
||||
}
|
||||
return updateVersion(tx, version)
|
||||
})
|
||||
}
|
||||
|
|
57
pkg/local_object_storage/metabase/reset_test.go
Normal file
57
pkg/local_object_storage/metabase/reset_test.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type epochState struct{ e uint64 }
|
||||
|
||||
func (s epochState) CurrentEpoch() uint64 {
|
||||
return s.e
|
||||
}
|
||||
|
||||
func TestResetDropsContainerBuckets(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := New(
|
||||
[]Option{
|
||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||
WithPermissions(0o600),
|
||||
WithEpochState(epochState{}),
|
||||
}...,
|
||||
)
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
for idx := 0; idx < 100; idx++ {
|
||||
var putPrm PutPrm
|
||||
putPrm.SetObject(testutil.GenerateObject())
|
||||
putPrm.SetStorageID([]byte(fmt.Sprintf("0/%d", idx)))
|
||||
_, err := db.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, db.Reset())
|
||||
|
||||
var bucketCount int
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
_, exists := mStaticBuckets[string(name)]
|
||||
require.True(t, exists, "unexpected bucket:"+string(name))
|
||||
bucketCount++
|
||||
return nil
|
||||
})
|
||||
}))
|
||||
require.Equal(t, len(mStaticBuckets), bucketCount)
|
||||
}
|
Loading…
Reference in a new issue