[#1334] metabase: Store upgrade flag

This allows to check if metabase upgrade was not completed.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-29 16:33:30 +03:00
parent 6c2146bbc1
commit 882c068410
4 changed files with 43 additions and 2 deletions

View file

@ -51,9 +51,21 @@ func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any))
if !found { if !found {
return fmt.Errorf("unsupported version %d: no update available", version) return fmt.Errorf("unsupported version %d: no update available", version)
} }
if err := db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
return b.Put(upgradeKey, zeroValue)
}); err != nil {
return fmt.Errorf("set upgrade key %w", err)
}
if err := updater(ctx, db, log); err != nil { if err := updater(ctx, db, log); err != nil {
return fmt.Errorf("update metabase schema: %w", err) return fmt.Errorf("update metabase schema: %w", err)
} }
if err := db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
return b.Delete(upgradeKey)
}); err != nil {
return fmt.Errorf("delete upgrade key %w", err)
}
if compact { if compact {
log("compacting metabase...") log("compacting metabase...")
err := compactDB(db) err := compactDB(db)

View file

@ -12,13 +12,18 @@ import (
// version contains current metabase version. // version contains current metabase version.
const version = 3 const version = 3
var versionKey = []byte("version") var (
versionKey = []byte("version")
upgradeKey = []byte("upgrade")
)
// ErrOutdatedVersion is returned on initializing // ErrOutdatedVersion is returned on initializing
// an existing metabase that is not compatible with // an existing metabase that is not compatible with
// the current code version. // the current code version.
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required") var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
var ErrIncompletedUpgrade = logicerr.New("metabase upgrade is not completed")
var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket") var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket")
func checkVersion(tx *bbolt.Tx, initialized bool) error { func checkVersion(tx *bbolt.Tx, initialized bool) error {
@ -35,6 +40,10 @@ func checkVersion(tx *bbolt.Tx, initialized bool) error {
return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored) return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored)
} }
} }
data = b.Get(upgradeKey)
if len(data) > 0 {
return ErrIncompletedUpgrade
}
} }
if !initialized { if !initialized {

View file

@ -84,4 +84,24 @@ func TestVersion(t *testing.T) {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
}) })
}) })
t.Run("incompleted upgrade", func(t *testing.T) {
db := newDB(t)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(shardInfoBucket).Put(upgradeKey, zeroValue)
}))
require.ErrorIs(t, db.Init(), ErrIncompletedUpgrade)
require.NoError(t, db.Close())
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(shardInfoBucket).Delete(upgradeKey)
}))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
})
} }

View file

@ -171,7 +171,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
for _, component := range components { for _, component := range components {
if err := component.Init(); err != nil { if err := component.Init(); err != nil {
if component == s.metaBase { if component == s.metaBase {
if errors.Is(err, meta.ErrOutdatedVersion) { if errors.Is(err, meta.ErrOutdatedVersion) || errors.Is(err, meta.ErrIncompletedUpgrade) {
return fmt.Errorf("metabase initialization: %w", err) return fmt.Errorf("metabase initialization: %w", err)
} }