Move shard to degraded mode if metabase open failed #918
4 changed files with 53 additions and 47 deletions
|
@ -571,4 +571,5 @@ const (
|
|||
GetSvcV2FailedToParseNodeExternalAddresses = "failed to parse node external addresses"
|
||||
GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node"
|
||||
GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes"
|
||||
FailedToUpdateShardID = "failed to update shard id"
|
||||
)
|
||||
|
|
|
@ -118,7 +118,7 @@ func TestInitializationFailure(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) {
|
||||
func testEngineFailInitAndReload(t *testing.T, degradedMode bool, opts []shard.Option, beforeReload func()) {
|
||||
var configID string
|
||||
|
||||
e := New()
|
||||
|
@ -126,33 +126,30 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
_, err := e.AddShard(context.Background(), opts...)
|
||||
if errOnAdd {
|
||||
require.Error(t, err)
|
||||
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||
// The id cannot be encountered during normal operation, but it is ok for tests:
|
||||
// it is only compared for equality with other ids and we have 0 shards here.
|
||||
configID = "id"
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
|
||||
e.mtx.RLock()
|
||||
var id string
|
||||
for id = range e.shards {
|
||||
break
|
||||
}
|
||||
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
||||
e.mtx.RUnlock()
|
||||
|
||||
err = e.Open(context.Background())
|
||||
if err == nil {
|
||||
require.Error(t, e.Init(context.Background()))
|
||||
}
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
e.mtx.RLock()
|
||||
shardCount := len(e.shards)
|
||||
var id string
|
||||
for id = range e.shards {
|
||||
break
|
||||
}
|
||||
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
||||
e.mtx.RUnlock()
|
||||
require.Equal(t, 0, shardCount)
|
||||
|
||||
err = e.Open(context.Background())
|
||||
require.NoError(t, err)
|
||||
if degradedMode {
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
require.Equal(t, mode.DegradedReadOnly, e.DumpInfo().Shards[0].Mode)
|
||||
return
|
||||
} else {
|
||||
require.Error(t, e.Init(context.Background()))
|
||||
|
||||
e.mtx.RLock()
|
||||
shardCount := len(e.shards)
|
||||
e.mtx.RUnlock()
|
||||
require.Equal(t, 0, shardCount)
|
||||
}
|
||||
|
||||
beforeReload()
|
||||
|
||||
|
@ -161,7 +158,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
}))
|
||||
|
||||
e.mtx.RLock()
|
||||
shardCount = len(e.shards)
|
||||
shardCount := len(e.shards)
|
||||
e.mtx.RUnlock()
|
||||
require.Equal(t, 1, shardCount)
|
||||
}
|
||||
|
|
|
@ -124,10 +124,10 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
|
|||
)...)
|
||||
|
||||
if err := sh.UpdateID(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
||||
e.log.Warn(logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
||||
}
|
||||
|
||||
return sh, err
|
||||
return sh, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard.Option {
|
||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/mr-tron/base58"
|
||||
|
@ -30,27 +31,32 @@ func (s *Shard) ID() *ID {
|
|||
|
||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||
var metabaseOpened bool
|
||||
var idFromMetabase []byte
|
||||
if err = s.metaBase.Open(ctx, false); err != nil {
|
||||
return err
|
||||
err = fmt.Errorf("failed to open metabase: %w", err)
|
||||
} else {
|
||||
metabaseOpened = true
|
||||
}
|
||||
defer func() {
|
||||
cErr := s.metaBase.Close()
|
||||
if err == nil {
|
||||
err = cErr
|
||||
}
|
||||
}()
|
||||
id, err := s.metaBase.ReadShardID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(id) != 0 {
|
||||
s.info.ID = NewIDFromBytes(id)
|
||||
|
||||
if s.cfg.metricsWriter != nil {
|
||||
s.cfg.metricsWriter.SetShardID(s.info.ID.String())
|
||||
if metabaseOpened {
|
||||
defer func() {
|
||||
cErr := s.metaBase.Close()
|
||||
if cErr != nil {
|
||||
err = fmt.Errorf("failed to close metabase: %w", cErr)
|
||||
}
|
||||
}()
|
||||
if idFromMetabase, err = s.metaBase.ReadShardID(); err != nil {
|
||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(idFromMetabase) != 0 {
|
||||
s.info.ID = NewIDFromBytes(idFromMetabase)
|
||||
}
|
||||
|
||||
if s.cfg.metricsWriter != nil {
|
||||
s.cfg.metricsWriter.SetShardID(s.info.ID.String())
|
||||
}
|
||||
s.log = &logger.Logger{Logger: s.log.With(zap.Stringer("shard_id", s.info.ID))}
|
||||
s.metaBase.SetLogger(s.log)
|
||||
s.blobStor.SetLogger(s.log)
|
||||
|
@ -63,8 +69,10 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
|||
s.pilorama.SetParentID(s.info.ID.String())
|
||||
}
|
||||
|
||||
if len(id) != 0 {
|
||||
return nil
|
||||
if len(idFromMetabase) == 0 && metabaseOpened {
|
||||
if err = s.metaBase.WriteShardID(*s.info.ID); err != nil {
|
||||
err = fmt.Errorf("failed to write shard id to metabase: %w", err)
|
||||
}
|
||||
}
|
||||
return s.metaBase.WriteShardID(*s.info.ID)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue