[#1355] metabase: Upgrade improvements

Do not fail on same latest version to run compact on upgraded metabase.
Use NoSync on compact.
Log every batch on bucket delete stage.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-09-06 11:17:02 +03:00
parent 654d970fad
commit 4668efc0bf

View file

@ -27,6 +27,10 @@ const (
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{ var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(a ...any)) error{
2: upgradeFromV2ToV3, 2: upgradeFromV2ToV3,
3: func(_ context.Context, _ *bbolt.DB, log func(a ...any)) error {
log("metabase already upgraded")
return nil
},
} }
func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error { func Upgrade(ctx context.Context, path string, compact bool, log func(a ...any)) error {
@ -86,6 +90,7 @@ func compactDB(db *bbolt.DB) error {
} }
dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{ dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{
Timeout: 100 * time.Millisecond, Timeout: 100 * time.Millisecond,
NoSync: true,
}) })
if err != nil { if err != nil {
return fmt.Errorf("can't open new metabase to compact: %w", err) return fmt.Errorf("can't open new metabase to compact: %w", err)
@ -93,6 +98,9 @@ func compactDB(db *bbolt.DB) error {
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil { if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName))) return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
} }
if err := dst.Sync(); err != nil {
return fmt.Errorf("sync compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
}
if err := dst.Close(); err != nil { if err := dst.Close(); err != nil {
return fmt.Errorf("close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName))) return fmt.Errorf("close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
} }
@ -369,8 +377,7 @@ func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte, log f
log("deleting buckets completed with an error:", err) log("deleting buckets completed with an error:", err)
return err return err
} }
if count += uint64(len(keys)); count%upgradeLogFrequency == 0 { count += uint64(len(keys))
log("deleted", count, "buckets") log("deleted", count, "buckets")
} }
}
} }