forked from TrueCloudLab/frostfs-node
[#1483] metabase: Store version
The main problem is to distinguish the case of initial initialization and update from version 0. We can't do this at `Open`, because of `resync_metabase` flag. Thus, the following approach was taken: 1. During `Open` check whether the metabase was initialized. 2. Check for the version in `Init` or write the new one if the metabase is new. 3. Update version in `Reset`. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
7df50297cd
commit
6f243a2a76
4 changed files with 158 additions and 2 deletions
|
@ -27,7 +27,27 @@ func (db *DB) Open() error {
|
|||
|
||||
db.log.Debug("opened boltDB instance for Metabase")
|
||||
|
||||
db.log.Debug("checking metabase version")
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
// The safest way to check if the metabase is fresh is to check if it has no buckets.
|
||||
// However, shard info can be present. So here we check that the number of buckets is
|
||||
// at most 1.
|
||||
// Another thing to consider is that tests do not persist shard ID, we want to support
|
||||
// this case too.
|
||||
var n int
|
||||
err := tx.ForEach(func([]byte, *bbolt.Bucket) error {
|
||||
if n++; n >= 2 { // do not iterate a lot
|
||||
return errBreakBucketForEach
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == errBreakBucketForEach {
|
||||
db.initialized = true
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Init initializes metabase. It creates static (CID-independent) buckets in underlying BoltDB instance.
|
||||
|
@ -53,6 +73,14 @@ func (db *DB) init(reset bool) error {
|
|||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
if !reset {
|
||||
// Normal open, check version and update if not initialized.
|
||||
err := checkVersion(tx, db.initialized)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for k := range mStaticBuckets {
|
||||
b, err := tx.CreateBucketIfNotExists([]byte(k))
|
||||
if err != nil {
|
||||
|
@ -70,13 +98,17 @@ func (db *DB) init(reset bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
err = tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
if _, ok := mStaticBuckets[string(name)]; !ok {
|
||||
return tx.DeleteBucket(name)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return updateVersion(tx, version)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ type DB struct {
|
|||
matchers map[object.SearchMatchType]matcher
|
||||
|
||||
boltDB *bbolt.DB
|
||||
|
||||
initialized bool
|
||||
}
|
||||
|
||||
// Option is an option of DB constructor.
|
||||
|
|
41
pkg/local_object_storage/metabase/version.go
Normal file
41
pkg/local_object_storage/metabase/version.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// version contains current metabase version.
|
||||
const version = 0
|
||||
|
||||
var versionKey = []byte("version")
|
||||
|
||||
func checkVersion(tx *bbolt.Tx, initialized bool) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b != nil {
|
||||
data := b.Get(versionKey)
|
||||
if len(data) == 8 {
|
||||
stored := binary.LittleEndian.Uint64(data)
|
||||
if stored != version {
|
||||
return fmt.Errorf("invalid version: expected=%d, stored=%d", version, stored)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !initialized { // new database, write version
|
||||
return updateVersion(tx, version)
|
||||
}
|
||||
return nil // return error here after the first version increase
|
||||
}
|
||||
|
||||
func updateVersion(tx *bbolt.Tx, version uint64) error {
|
||||
data := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, version)
|
||||
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create auxilliary bucket: %w", err)
|
||||
}
|
||||
return b.Put(versionKey, data)
|
||||
}
|
81
pkg/local_object_storage/metabase/version_test.go
Normal file
81
pkg/local_object_storage/metabase/version_test.go
Normal file
|
@ -0,0 +1,81 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func TestVersion(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
newDB := func(t *testing.T) *DB {
|
||||
return New(WithPath(filepath.Join(dir, t.Name())),
|
||||
WithPermissions(0600))
|
||||
}
|
||||
check := func(t *testing.T, db *DB) {
|
||||
require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return errors.New("shard info bucket not found")
|
||||
}
|
||||
data := b.Get(versionKey)
|
||||
if len(data) != 8 {
|
||||
return errors.New("invalid version data")
|
||||
}
|
||||
if stored := binary.LittleEndian.Uint64(data); stored != version {
|
||||
return fmt.Errorf("invalid version: %d != %d", stored, version)
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
t.Run("simple", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open())
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
t.Run("reopen", func(t *testing.T) {
|
||||
require.NoError(t, db.Open())
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
})
|
||||
t.Run("old data", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open())
|
||||
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open())
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
t.Run("invalid version", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open())
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return updateVersion(tx, version+1)
|
||||
}))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open())
|
||||
require.Error(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
t.Run("reset", func(t *testing.T) {
|
||||
require.NoError(t, db.Open())
|
||||
require.NoError(t, db.Reset())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue