metabase: Fix drop buckets during resync #1050

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/resync_bucket_drop into master 2024-03-19 11:45:06 +00:00
2 changed files with 82 additions and 21 deletions

View file

@ -21,6 +21,22 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
// ErrReadOnlyMode is returned when metabase is in a read-only mode. // ErrReadOnlyMode is returned when metabase is in a read-only mode.
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode") var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
var (
mStaticBuckets = map[string]struct{}{
string(containerVolumeBucketName): {},
string(containerCounterBucketName): {},
string(graveyardBucketName): {},
string(garbageBucketName): {},
string(shardInfoBucket): {},
string(bucketNameLocked): {},
}
// deprecatedBuckets buckets that are not used anymore.
deprecatedBuckets = [][]byte{
toMoveItBucketName,
}
)
// Open boltDB instance for metabase. // Open boltDB instance for metabase.
func (db *DB) Open(_ context.Context, mode mode.Mode) error { func (db *DB) Open(_ context.Context, mode mode.Mode) error {
db.modeMtx.Lock() db.modeMtx.Lock()
@ -113,20 +129,6 @@ func (db *DB) init(reset bool) error {
return nil return nil
} }
mStaticBuckets := map[string]struct{}{
string(containerVolumeBucketName): {},
string(containerCounterBucketName): {},
string(graveyardBucketName): {},
string(garbageBucketName): {},
string(shardInfoBucket): {},
string(bucketNameLocked): {},
}
// buckets that are not used anymore
deprecatedBuckets := [][]byte{
toMoveItBucketName,
}
return db.boltDB.Update(func(tx *bbolt.Tx) error { return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error var err error
if !reset { if !reset {
@ -167,16 +169,18 @@ func (db *DB) init(reset bool) error {
return nil return nil
} }
err = tx.ForEach(func(name []byte, _ *bbolt.Bucket) error { bucketCursor := tx.Cursor()
name, _ := bucketCursor.First()
for name != nil {
if _, ok := mStaticBuckets[string(name)]; !ok { if _, ok := mStaticBuckets[string(name)]; !ok {
return tx.DeleteBucket(name) if err := tx.DeleteBucket(name); err != nil {
}
return nil
})
if err != nil {
return err return err

Why do we need clone here? The name should not be changed by DeleteBucket

Why do we need clone here? The name should not be changed by `DeleteBucket`

oups, fixed

oups, fixed
} }
name, _ = bucketCursor.Seek(name)
continue
}
name, _ = bucketCursor.Next()
}
return updateVersion(tx, version) return updateVersion(tx, version)
}) })
} }

View file

@ -0,0 +1,57 @@
package meta

Moved to separate file to use the meta package to have access to boltdb field.

Moved to separate file to use the `meta` package to have access to `boltdb` field.
import (
"context"
"fmt"
"path/filepath"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
type epochState struct{ e uint64 }
func (s epochState) CurrentEpoch() uint64 {
return s.e
}
func TestResetDropsContainerBuckets(t *testing.T) {
t.Parallel()
db := New(
[]Option{
WithPath(filepath.Join(t.TempDir(), "metabase")),
WithPermissions(0o600),
WithEpochState(epochState{}),
}...,
)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
defer func() { require.NoError(t, db.Close()) }()
for idx := 0; idx < 100; idx++ {
var putPrm PutPrm
putPrm.SetObject(testutil.GenerateObject())
putPrm.SetStorageID([]byte(fmt.Sprintf("0/%d", idx)))
_, err := db.Put(context.Background(), putPrm)
require.NoError(t, err)
}
require.NoError(t, db.Reset())
var bucketCount int
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
_, exists := mStaticBuckets[string(name)]
require.True(t, exists, "unexpected bucket:"+string(name))
bucketCount++
return nil
})
}))
require.Equal(t, len(mStaticBuckets), bucketCount)
}