From 6f243a2a766404c77f3c7e03f1a392e29ecdbe7f Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 15 Jun 2022 12:50:59 +0300 Subject: [PATCH] [#1483] metabase: Store version The main problem is to distinguish the case of initial initialization and update from version 0. We can't do this at `Open`, because of `resync_metabase` flag. Thus, the following approach was taken: 1. During `Open` check whether the metabase was initialized. 2. Check for the version in `Init` or write the new one if the metabase is new. 3. Update version in `Reset`. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/metabase/control.go | 36 ++++++++- pkg/local_object_storage/metabase/db.go | 2 + pkg/local_object_storage/metabase/version.go | 41 ++++++++++ .../metabase/version_test.go | 81 +++++++++++++++++++ 4 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 pkg/local_object_storage/metabase/version.go create mode 100644 pkg/local_object_storage/metabase/version_test.go diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index c36e999cd..42f60d2c4 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -27,7 +27,27 @@ func (db *DB) Open() error { db.log.Debug("opened boltDB instance for Metabase") - return nil + db.log.Debug("checking metabase version") + return db.boltDB.View(func(tx *bbolt.Tx) error { + // The safest way to check if the metabase is fresh is to check if it has no buckets. + // However, shard info can be present. So here we check that the number of buckets is + // at most 1. + // Another thing to consider is that tests do not persist shard ID, we want to support + // this case too. + var n int + err := tx.ForEach(func([]byte, *bbolt.Bucket) error { + if n++; n >= 2 { // do not iterate a lot + return errBreakBucketForEach + } + return nil + }) + + if err == errBreakBucketForEach { + db.initialized = true + err = nil + } + return err + }) } // Init initializes metabase. It creates static (CID-independent) buckets in underlying BoltDB instance. @@ -53,6 +73,14 @@ func (db *DB) init(reset bool) error { } return db.boltDB.Update(func(tx *bbolt.Tx) error { + var err error + if !reset { + // Normal open, check version and update if not initialized. + err := checkVersion(tx, db.initialized) + if err != nil { + return err + } + } for k := range mStaticBuckets { b, err := tx.CreateBucketIfNotExists([]byte(k)) if err != nil { @@ -70,13 +98,17 @@ func (db *DB) init(reset bool) error { return nil } - return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + err = tx.ForEach(func(name []byte, b *bbolt.Bucket) error { if _, ok := mStaticBuckets[string(name)]; !ok { return tx.DeleteBucket(name) } return nil }) + if err != nil { + return err + } + return updateVersion(tx, version) }) } diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 3143ded14..7f15af2a5 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -29,6 +29,8 @@ type DB struct { matchers map[object.SearchMatchType]matcher boltDB *bbolt.DB + + initialized bool } // Option is an option of DB constructor. diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go new file mode 100644 index 000000000..b9d3e86b7 --- /dev/null +++ b/pkg/local_object_storage/metabase/version.go @@ -0,0 +1,41 @@ +package meta + +import ( + "encoding/binary" + "fmt" + + "go.etcd.io/bbolt" +) + +// version contains current metabase version. +const version = 0 + +var versionKey = []byte("version") + +func checkVersion(tx *bbolt.Tx, initialized bool) error { + b := tx.Bucket(shardInfoBucket) + if b != nil { + data := b.Get(versionKey) + if len(data) == 8 { + stored := binary.LittleEndian.Uint64(data) + if stored != version { + return fmt.Errorf("invalid version: expected=%d, stored=%d", version, stored) + } + } + } + if !initialized { // new database, write version + return updateVersion(tx, version) + } + return nil // return error here after the first version increase +} + +func updateVersion(tx *bbolt.Tx, version uint64) error { + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, version) + + b, err := tx.CreateBucketIfNotExists(shardInfoBucket) + if err != nil { + return fmt.Errorf("can't create auxilliary bucket: %w", err) + } + return b.Put(versionKey, data) +} diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go new file mode 100644 index 000000000..371bebfea --- /dev/null +++ b/pkg/local_object_storage/metabase/version_test.go @@ -0,0 +1,81 @@ +package meta + +import ( + "encoding/binary" + "errors" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestVersion(t *testing.T) { + dir := t.TempDir() + + newDB := func(t *testing.T) *DB { + return New(WithPath(filepath.Join(dir, t.Name())), + WithPermissions(0600)) + } + check := func(t *testing.T, db *DB) { + require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(shardInfoBucket) + if b == nil { + return errors.New("shard info bucket not found") + } + data := b.Get(versionKey) + if len(data) != 8 { + return errors.New("invalid version data") + } + if stored := binary.LittleEndian.Uint64(data); stored != version { + return fmt.Errorf("invalid version: %d != %d", stored, version) + } + return nil + })) + } + t.Run("simple", func(t *testing.T) { + db := newDB(t) + require.NoError(t, db.Open()) + require.NoError(t, db.Init()) + check(t, db) + require.NoError(t, db.Close()) + + t.Run("reopen", func(t *testing.T) { + require.NoError(t, db.Open()) + require.NoError(t, db.Init()) + check(t, db) + require.NoError(t, db.Close()) + }) + }) + t.Run("old data", func(t *testing.T) { + db := newDB(t) + require.NoError(t, db.Open()) + require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4})) + require.NoError(t, db.Close()) + + require.NoError(t, db.Open()) + require.NoError(t, db.Init()) + check(t, db) + require.NoError(t, db.Close()) + }) + t.Run("invalid version", func(t *testing.T) { + db := newDB(t) + require.NoError(t, db.Open()) + require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { + return updateVersion(tx, version+1) + })) + require.NoError(t, db.Close()) + + require.NoError(t, db.Open()) + require.Error(t, db.Init()) + require.NoError(t, db.Close()) + + t.Run("reset", func(t *testing.T) { + require.NoError(t, db.Open()) + require.NoError(t, db.Reset()) + check(t, db) + require.NoError(t, db.Close()) + }) + }) +}