metabase: fix mode metric #1049
5 changed files with 69 additions and 38 deletions
|
@ -107,7 +107,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
|
||||||
return sh.ID(), nil
|
return sh.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||||
id, err := generateShardID()
|
id, err := generateShardID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||||
|
@ -126,7 +126,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
|
||||||
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
if err := sh.UpdateID(ctx); err != nil {
|
if err := sh.UpdateID(); err != nil {
|
||||||
e.log.Warn(logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
e.log.Warn(logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -202,11 +202,12 @@ func (db *DB) SyncCounters() error {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes boltDB instance.
|
// Close closes boltDB instance
|
||||||
|
// and reports metabase metric.
|
||||||
func (db *DB) Close() error {
|
func (db *DB) Close() error {
|
||||||
var err error
|
var err error
|
||||||
if db.boltDB != nil {
|
if db.boltDB != nil {
|
||||||
err = metaerr.Wrap(db.boltDB.Close())
|
err = db.close()
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
db.metrics.Close()
|
db.metrics.Close()
|
||||||
|
@ -214,6 +215,10 @@ func (db *DB) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) close() error {
|
||||||
|
return metaerr.Wrap(db.boltDB.Close())
|
||||||
|
}
|
||||||
|
|
||||||
// Reload reloads part of the configuration.
|
// Reload reloads part of the configuration.
|
||||||
// It returns true iff database was reopened.
|
// It returns true iff database was reopened.
|
||||||
// If a config option is invalid, it logs an error and returns nil.
|
// If a config option is invalid, it logs an error and returns nil.
|
||||||
|
|
|
@ -2,8 +2,11 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
metamode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,16 +15,32 @@ var (
|
||||||
shardIDKey = []byte("id")
|
shardIDKey = []byte("id")
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadShardID reads shard id from db.
|
// GetShardID sets metabase operation mode
|
||||||
|
// and reads shard id from db.
|
||||||
// If id is missing, returns nil, nil.
|
// If id is missing, returns nil, nil.
|
||||||
func (db *DB) ReadShardID() ([]byte, error) {
|
//
|
||||||
db.modeMtx.RLock()
|
// GetShardID does not report any metrics.
|
||||||
defer db.modeMtx.RUnlock()
|
func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
|
||||||
|
db.modeMtx.Lock()
|
||||||
|
defer db.modeMtx.Unlock()
|
||||||
|
db.mode = mode
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
if err := db.openDB(mode); err != nil {
|
||||||
return nil, ErrDegradedMode
|
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
id, err := db.readShardID()
|
||||||
|
|
||||||
|
if cErr := db.close(); cErr != nil {
|
||||||
|
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
||||||
|
}
|
||||||
|
|
||||||
|
return id, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadShardID reads shard id from db.
|
||||||
|
// If id is missing, returns nil, nil.
|
||||||
|
func (db *DB) readShardID() ([]byte, error) {
|
||||||
var id []byte
|
var id []byte
|
||||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
b := tx.Bucket(shardInfoBucket)
|
||||||
|
@ -33,17 +52,35 @@ func (db *DB) ReadShardID() ([]byte, error) {
|
||||||
return id, metaerr.Wrap(err)
|
return id, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteShardID writes shard it to db.
|
// SetShardID sets metabase operation mode
|
||||||
func (db *DB) WriteShardID(id []byte) error {
|
// and writes shard id to db.
|
||||||
db.modeMtx.RLock()
|
func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
|
||||||
defer db.modeMtx.RUnlock()
|
db.modeMtx.Lock()
|
||||||
|
defer db.modeMtx.Unlock()
|
||||||
|
db.mode = mode
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
if mode.ReadOnly() {
|
||||||
return ErrDegradedMode
|
|
||||||
} else if db.mode.ReadOnly() {
|
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := db.openDB(mode); err != nil {
|
||||||
|
return fmt.Errorf("failed to open metabase: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := db.writeShardID(id)
|
||||||
|
if err == nil {
|
||||||
|
db.metrics.SetMode(mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cErr := db.close(); cErr != nil {
|
||||||
|
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
||||||
|
}
|
||||||
|
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeShardID writes shard id to db.
|
||||||
|
func (db *DB) writeShardID(id []byte) error {
|
||||||
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
|
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -58,9 +58,7 @@ func TestVersion(t *testing.T) {
|
||||||
})
|
})
|
||||||
t.Run("old data", func(t *testing.T) {
|
t.Run("old data", func(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.SetShardID([]byte{1, 2, 3, 4}, mode.ReadWrite))
|
||||||
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
|
|
||||||
require.NoError(t, db.Close())
|
|
||||||
|
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, db.Init())
|
require.NoError(t, db.Init())
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
package shard
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -30,21 +31,11 @@ func (s *Shard) ID() *ID {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||||
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
func (s *Shard) UpdateID() (err error) {
|
||||||
var idFromMetabase []byte
|
var idFromMetabase []byte
|
||||||
metabaseOpened := !s.GetMode().NoMetabase()
|
modeDegraded := s.GetMode().NoMetabase()
|
||||||
if err = s.metaBase.Open(ctx, s.GetMode()); err != nil {
|
if !modeDegraded {
|
||||||
err = fmt.Errorf("failed to open metabase: %w", err)
|
if idFromMetabase, err = s.metaBase.GetShardID(mode.ReadOnly); err != nil {
|
||||||
metabaseOpened = false
|
|
||||||
}
|
|
||||||
if metabaseOpened {
|
|
||||||
defer func() {
|
|
||||||
cErr := s.metaBase.Close()
|
|
||||||
if cErr != nil {
|
|
||||||
err = fmt.Errorf("failed to close metabase: %w", cErr)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
if idFromMetabase, err = s.metaBase.ReadShardID(); err != nil {
|
|
||||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,9 +64,9 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
s.pilorama.SetParentID(s.info.ID.String())
|
s.pilorama.SetParentID(s.info.ID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(idFromMetabase) == 0 && metabaseOpened {
|
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||||
if err = s.metaBase.WriteShardID(*s.info.ID); err != nil {
|
if setErr := s.metaBase.SetShardID(*s.info.ID, s.GetMode()); setErr != nil {
|
||||||
err = fmt.Errorf("failed to write shard id to metabase: %w", err)
|
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue