[#1334] metabase: Add upgrade from v2 to v3
Some checks failed
DCO action / DCO (pull_request) Successful in 1m40s
Tests and linters / Run gofumpt (pull_request) Successful in 1m55s
Build / Build Components (1.23) (pull_request) Successful in 2m36s
Build / Build Components (1.22) (pull_request) Successful in 2m41s
Tests and linters / Staticcheck (pull_request) Failing after 2m27s
Vulncheck / Vulncheck (pull_request) Successful in 2m22s
Tests and linters / Lint (pull_request) Failing after 2m48s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m2s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m1s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m0s
Tests and linters / Tests with -race (pull_request) Successful in 3m46s
Tests and linters / gopls check (pull_request) Successful in 4m2s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-28 18:32:30 +03:00
parent 7abbdca064
commit 17c8967057
7 changed files with 572 additions and 3 deletions

View file

@ -0,0 +1,15 @@
package metabase
import "github.com/spf13/cobra"
// RootCmd is a root command of config section.
var RootCmd = &cobra.Command{
Use: "metabase",
Short: "Section for metabase commands",
}
func init() {
RootCmd.AddCommand(UpgradeCmd)
initUpgradeCommand()
}

View file

@ -0,0 +1,44 @@
package metabase
import (
"fmt"
"os"
"time"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"github.com/spf13/cobra"
)
const (
pathFlag = "path"
noCompactFlag = "no-compact"
)
var path string
var UpgradeCmd = &cobra.Command{
Use: "upgrade",
Short: "Upgrade metabase to latest version",
RunE: upgrade,
}
func upgrade(cmd *cobra.Command, _ []string) error {
path, _ := cmd.Flags().GetString(pathFlag)
noCompact, _ := cmd.Flags().GetBool(noCompactFlag)
if _, err := os.Stat(path); err != nil {
return fmt.Errorf("failed to check file: %w", err)
}
if err := meta.Upgrade(cmd.Context(), path, !noCompact, func(msg string) {
cmd.Println(fmt.Sprintf("%s:%s", time.Now().Format(time.RFC3339), msg))
}); err != nil {
return fmt.Errorf("failed to upgrade metabase: %w", err)
}
return nil
}
func initUpgradeCommand() {
flags := UpgradeCmd.Flags()
flags.StringVar(&path, pathFlag, "", "Path to metabase file")
_ = UpgradeCmd.MarkFlagRequired(pathFlag)
flags.Bool(noCompactFlag, false, "Do not compact upgraded metabase file")
}

View file

@ -5,6 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/storagecfg"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc" "git.frostfs.info/TrueCloudLab/frostfs-node/misc"
@ -41,6 +42,7 @@ func init() {
rootCmd.AddCommand(config.RootCmd) rootCmd.AddCommand(config.RootCmd)
rootCmd.AddCommand(morph.RootCmd) rootCmd.AddCommand(morph.RootCmd)
rootCmd.AddCommand(storagecfg.RootCmd) rootCmd.AddCommand(storagecfg.RootCmd)
rootCmd.AddCommand(metabase.RootCmd)
rootCmd.AddCommand(autocomplete.Command("frostfs-adm")) rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{})) rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))

View file

@ -0,0 +1,283 @@
package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"strconv"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)
var updates = map[uint64]func(ctx context.Context, db *bbolt.DB, log func(msg string)) error{
2: upgradeFromV2ToV3,
}
var errFailedToUpgradeDatabaseNotOpen = errors.New("failed to upgrade metabase: database not open")
func Upgrade(ctx context.Context, path string, compact bool, log func(msg string)) error {
db, err := bbolt.Open(path, os.ModePerm, bbolt.DefaultOptions)
if err != nil {
return fmt.Errorf("failed to open metabase: %w", err)
}
var version uint64
if err := db.View(func(tx *bbolt.Tx) error {
var e error
version, e = currentVersion(tx)
return e
}); err != nil {
return err
}
updater, found := updates[version]
if !found {
return fmt.Errorf("unsupported version %d: no update available", version)
}
if err := updater(ctx, db, log); err != nil {
return fmt.Errorf("failed to update metabase schema: %w", err)
}
if compact {
log("compacting metabase...")
err := compactDB(db)
if err != nil {
return fmt.Errorf("failed to compact metabase: %w", err)
}
log("metabase compacted")
}
return db.Close()
}
func compactDB(db *bbolt.DB) error {
sourcePath := db.Path()
tmpFileName := sourcePath + "." + time.Now().Format(time.RFC3339)
f, err := os.Stat(sourcePath)
if err != nil {
return err
}
dst, err := bbolt.Open(tmpFileName, f.Mode(), &bbolt.Options{
Timeout: 100 * time.Millisecond,
})
if err != nil {
return fmt.Errorf("can't open new metabase to compact: %w", err)
}
if err := bbolt.Compact(dst, db, 256<<20); err != nil {
return fmt.Errorf("failed to compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
}
if err := dst.Close(); err != nil {
return fmt.Errorf("failed to close compacted metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
}
if err := db.Close(); err != nil {
return fmt.Errorf("failed to close source metabase: %w", errors.Join(err, os.Remove(tmpFileName)))
}
if err := os.Rename(tmpFileName, sourcePath); err != nil {
return fmt.Errorf("failed to replace source metabase with compacted: %w", errors.Join(err, os.Remove(tmpFileName)))
}
return nil
}
func upgradeFromV2ToV3(ctx context.Context, db *bbolt.DB, log func(msg string)) error {
if err := createExpirationEpochBuckets(ctx, db, log); err != nil {
return err
}
if err := dropUserAttributes(ctx, db, log); err != nil {
return err
}
if err := dropOwnerIDIndex(ctx, db, log); err != nil {
return err
}
if err := dropPayloadChecksumIndex(ctx, db, log); err != nil {
return err
}
return db.Update(func(tx *bbolt.Tx) error {
return updateVersion(tx, version)
})
}
type objectIDToExpEpoch struct {
containerID cid.ID
objectID oid.ID
expirationEpoch uint64
}
func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(msg string)) error {
log("filling expiration epoch buckets...")
if err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(expEpochToObjectBucketName)
return err
}); err != nil {
return err
}
objects := make(chan objectIDToExpEpoch, 1000)
defer func() {
close(objects)
}()
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return selectObjectsWithExpirationEpoch(ctx, db, objects)
})
eg.Go(func() error {
var count uint64
for {
if count++; count%1_000 == 0 {
log(fmt.Sprintf("expiration epoch filled for %d objects...", count-1))
}
select {
case <-ctx.Done():
return ctx.Err()
case obj := <-objects:
return db.Update(func(tx *bbolt.Tx) error {
if err := putUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
val: zeroValue,
}); err != nil {
return err
}
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
return putUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
val: val,
})
})
}
}
})
err := eg.Wait()
if err != nil {
log(fmt.Sprintf("expiration epoch buckets completed completed with error: %s", err.Error()))
return err
}
log("filling expiration epoch buckets completed successfully")
return nil
}
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
prefix := []byte{userAttributePrefix}
return db.View(func(tx *bbolt.Tx) error {
userAttrCursor := tx.Cursor()
for userAttrKey, _ := userAttrCursor.Seek(prefix); userAttrKey != nil && bytes.HasPrefix(userAttrKey, prefix); userAttrKey, _ = userAttrCursor.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if len(userAttrKey) <= 1+cidSize {
continue
}
attributeKey := string(userAttrKey[1+cidSize:])
if attributeKey != objectV2.SysAttributeExpEpochNeoFS && attributeKey != objectV2.SysAttributeExpEpoch {
continue
}
var containerID cid.ID
if err := containerID.Decode(userAttrKey[1 : 1+cidSize]); err != nil {
return fmt.Errorf("failed to decode container id from user attribute bucket: %w", err)
}
b := tx.Bucket(userAttrKey)
return b.ForEachBucket(func(expKey []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
expirationEpoch, err := strconv.ParseUint(string(expKey), 10, 64)
if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err)
}
expirationEpochBucket := b.Bucket(expKey)
return expirationEpochBucket.ForEach(func(k, _ []byte) error {
var objectID oid.ID
if err := objectID.Decode(k); err != nil {
return fmt.Errorf("failed to decode object id from container '%s' expiration epoch %d: %w", containerID, expirationEpoch, err)
}
select {
case <-ctx.Done():
return ctx.Err()
case objects <- objectIDToExpEpoch{
containerID: containerID,
objectID: objectID,
expirationEpoch: expirationEpoch,
}:
return nil
}
})
})
}
return nil
})
}
func dropUserAttributes(ctx context.Context, db *bbolt.DB, log func(msg string)) error {
return dropBucketsByPrefix(ctx, db, []byte{userAttributePrefix}, func(msg string) {
log(fmt.Sprintf("user attributes: %s", msg))
})
}
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(msg string)) error {
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(msg string) {
log(fmt.Sprintf("owner ID index: %s", msg))
})
}
func dropPayloadChecksumIndex(ctx context.Context, db *bbolt.DB, log func(msg string)) error {
return dropBucketsByPrefix(ctx, db, []byte{payloadHashPrefix}, func(msg string) {
log(fmt.Sprintf("payload checksum: %s", msg))
})
}
func dropBucketsByPrefix(ctx context.Context, db *bbolt.DB, prefix []byte, log func(msg string)) error {
log("deleting buckets...")
const batch = 100
var count uint64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var keys [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix) && len(keys) < batch; k, _ = c.Next() {
keys = append(keys, bytes.Clone(k))
}
return nil
}); err != nil {
log(fmt.Sprintf("deleting buckets completed with an error %s", err.Error()))
return err
}
if len(keys) == 0 {
log("deleting buckets completed successfully")
return nil
}
if err := db.Update(func(tx *bbolt.Tx) error {
for _, k := range keys {
if err := tx.DeleteBucket(k); err != nil {
return err
}
}
return nil
}); err != nil {
log(fmt.Sprintf("deleting buckets completed with an error %s", err.Error()))
return err
}
if count += uint64(len(keys)); count%10_000 == 0 {
log(fmt.Sprintf("deleted %d buckets", count))
}
}
}

View file

@ -0,0 +1,207 @@
//go:build integration
package meta
import (
"context"
"io"
"os"
"strconv"
"testing"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
const upgradeFilePath = "/path/to/metabase.v2"
func TestUpgradeV2ToV3(t *testing.T) {
path := createTempCopy(t, upgradeFilePath)
defer func() {
require.NoError(t, os.Remove(path))
}()
db := New(WithPath(path), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.ErrorIs(t, db.Init(), ErrOutdatedVersion)
require.NoError(t, db.Close())
require.NoError(t, Upgrade(context.Background(), path, true, func(msg string) { t.Log(msg) }))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
require.NoError(t, db.Close())
}
func createTempCopy(t *testing.T, path string) string {
src, err := os.Open(path)
require.NoError(t, err)
tmpPath := upgradeFilePath + time.Now().Format(time.RFC3339)
dest, err := os.Create(tmpPath)
require.NoError(t, err)
_, err = io.Copy(dest, src)
require.NoError(t, err)
require.NoError(t, src.Close())
require.NoError(t, dest.Close())
return tmpPath
}
func TestGenerateMetabaseFile(t *testing.T) {
t.Skip("for generating db")
// This test generates a metabase file with 2 million objects for 10 000 containers,
// of which
// 500 000 are simple objects,
// 500 000 are complex objects (total 1 million),
// 100 000 are deleted by gcMarks,
// 100 000 are deleted by tombstones (total 200 000),
// 100 000 million are locked (total 200 000).
db := New(WithPath(upgradeFilePath), WithEpochState(epochState{e: 1000}), WithLogger(test.NewLogger(t)),
WithMaxBatchDelay(100*time.Millisecond), WithMaxBatchSize(1000))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
db.boltDB.AllocSize = 128 << 20
db.boltDB.NoSync = true
require.NoError(t, db.Init())
containers := make([]cid.ID, 10_000)
for i := range containers {
containers[i] = cidtest.ID()
}
oc, err := db.ObjectCounters()
require.NoError(t, err)
require.True(t, oc.IsZero())
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(10000)
// simple objects
for i := 0; i < 500_000; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(10000)
// complex objects
for i := 0; i < 500_000; i++ {
i := i
eg.Go(func() error {
parent := testutil.GenerateObjectWithCID(containers[i%len(containers)])
child := testutil.GenerateObjectWithCID(containers[i%len(containers)])
child.SetParent(parent)
idParent, _ := parent.ID()
child.SetParentID(idParent)
testutil.AddAttribute(child, "FileName", strconv.FormatInt(int64(i%1_000), 10))
testutil.AddAttribute(parent, "FileName", strconv.FormatInt(int64(i%1_000), 10))
testutil.AddAttribute(child, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
_, err := db.Put(ctx, PutPrm{
obj: child,
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("complex objects generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(10000)
// simple objects deleted by gc marks
for i := 0; i < 100_000; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
})
require.NoError(t, err)
_, err = db.Inhume(ctx, InhumePrm{
target: []oid.Address{object.AddressOf(obj)},
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects deleted by gc marks generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(10000)
// simple objects deleted by tombstones
for i := 0; i < 100_000; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
})
tomb := testutil.GenerateObjectWithCID(containers[i%len(containers)])
tomb.SetType(objectSDK.TypeTombstone)
_, err = db.Put(ctx, PutPrm{
obj: tomb,
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
})
require.NoError(t, err)
tombAddr := object.AddressOf(tomb)
_, err = db.Inhume(ctx, InhumePrm{
target: []oid.Address{object.AddressOf(obj)},
tomb: &tombAddr,
})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects deleted by tombstones generated")
eg, ctx = errgroup.WithContext(context.Background())
eg.SetLimit(10000)
// simple objects locked by locks
for i := 0; i < 100_000; i++ {
i := i
eg.Go(func() error {
obj := testutil.GenerateObjectWithCID(containers[i%len(containers)])
testutil.AddAttribute(obj, "FileName", strconv.FormatInt(int64(i%1_000), 10))
_, err := db.Put(ctx, PutPrm{
obj: obj,
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
})
lock := testutil.GenerateObjectWithCID(containers[i%len(containers)])
lock.SetType(objectSDK.TypeLock)
testutil.AddAttribute(lock, objectV2.SysAttributeExpEpoch, strconv.FormatUint(uint64(i%1000+1000), 10))
_, err = db.Put(ctx, PutPrm{
obj: lock,
id: []byte(strconv.FormatInt(int64(i%10), 10) + "/" + strconv.FormatInt(int64(i%10_000), 10)),
})
require.NoError(t, err)
err = db.Lock(ctx, containers[i%len(containers)], object.AddressOf(lock).Object(), []oid.ID{object.AddressOf(obj).Object()})
require.NoError(t, err)
return nil
})
}
require.NoError(t, eg.Wait())
db.log.Info("simple objects locked by locks generated")
require.NoError(t, db.boltDB.Sync())
require.NoError(t, db.Close())
}

View file

@ -94,11 +94,13 @@ const (
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs. // ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
// Key: owner ID // Key: owner ID
// Value: bucket containing object IDs as keys // Value: bucket containing object IDs as keys
_ // removed in version 3
ownerPrefix
// userAttributePrefix was used for prefixing FKBT index buckets containing objects. // userAttributePrefix was used for prefixing FKBT index buckets containing objects.
// Key: attribute value // Key: attribute value
// Value: bucket containing object IDs as keys // Value: bucket containing object IDs as keys
_ // removed in version 3
userAttributePrefix
// ==================== // ====================
// List index buckets. // List index buckets.
@ -107,7 +109,8 @@ const (
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs. // payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
// Key: payload hash // Key: payload hash
// Value: list of object IDs // Value: list of object IDs
_ // removed in version 3
payloadHashPrefix
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs. // parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
// Key: parent ID // Key: parent ID
// Value: list of object IDs // Value: list of object IDs

View file

@ -2,6 +2,7 @@ package meta
import ( import (
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -18,6 +19,8 @@ var versionKey = []byte("version")
// the current code version. // the current code version.
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required") var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")
var errVersionUndefinedNoInfoBucket = errors.New("version undefined: no info bucket")
func checkVersion(tx *bbolt.Tx, initialized bool) error { func checkVersion(tx *bbolt.Tx, initialized bool) error {
var knownVersion bool var knownVersion bool
@ -59,3 +62,15 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
} }
return b.Put(versionKey, data) return b.Put(versionKey, data)
} }
func currentVersion(tx *bbolt.Tx) (uint64, error) {
b := tx.Bucket(shardInfoBucket)
if b == nil {
return 0, errVersionUndefinedNoInfoBucket
}
data := b.Get(versionKey)
if len(data) != 8 {
return 0, fmt.Errorf("version undefined: invalid version data length %d", len(data))
}
return binary.LittleEndian.Uint64(data), nil
}