[#918] engine: Move shard to degraded mode if metabase open failed

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-01-18 13:39:21 +03:00
parent f2f3294fc3
commit 931a5e9aaf
4 changed files with 53 additions and 47 deletions

View file

@ -571,4 +571,5 @@ const (
GetSvcV2FailedToParseNodeExternalAddresses = "failed to parse node external addresses" GetSvcV2FailedToParseNodeExternalAddresses = "failed to parse node external addresses"
GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node" GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node"
GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes" GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes"
FailedToUpdateShardID = "failed to update shard id"
) )

View file

@ -118,7 +118,7 @@ func TestInitializationFailure(t *testing.T) {
}) })
} }
func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) { func testEngineFailInitAndReload(t *testing.T, degradedMode bool, opts []shard.Option, beforeReload func()) {
var configID string var configID string
e := New() e := New()
@ -126,13 +126,6 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
_, err := e.AddShard(context.Background(), opts...) _, err := e.AddShard(context.Background(), opts...)
if errOnAdd {
require.Error(t, err)
// This branch is only taken when we cannot update shard ID in the metabase.
// The id cannot be encountered during normal operation, but it is ok for tests:
// it is only compared for equality with other ids and we have 0 shards here.
configID = "id"
} else {
require.NoError(t, err) require.NoError(t, err)
e.mtx.RLock() e.mtx.RLock()
@ -144,15 +137,19 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
e.mtx.RUnlock() e.mtx.RUnlock()
err = e.Open(context.Background()) err = e.Open(context.Background())
if err == nil { require.NoError(t, err)
if degradedMode {
require.NoError(t, e.Init(context.Background()))
require.Equal(t, mode.DegradedReadOnly, e.DumpInfo().Shards[0].Mode)
return
} else {
require.Error(t, e.Init(context.Background())) require.Error(t, e.Init(context.Background()))
}
}
e.mtx.RLock() e.mtx.RLock()
shardCount := len(e.shards) shardCount := len(e.shards)
e.mtx.RUnlock() e.mtx.RUnlock()
require.Equal(t, 0, shardCount) require.Equal(t, 0, shardCount)
}
beforeReload() beforeReload()
@ -161,7 +158,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
})) }))
e.mtx.RLock() e.mtx.RLock()
shardCount = len(e.shards) shardCount := len(e.shards)
e.mtx.RUnlock() e.mtx.RUnlock()
require.Equal(t, 1, shardCount) require.Equal(t, 1, shardCount)
} }

View file

@ -124,10 +124,10 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
)...) )...)
if err := sh.UpdateID(ctx); err != nil { if err := sh.UpdateID(ctx); err != nil {
return nil, fmt.Errorf("could not update shard ID: %w", err) e.log.Warn(logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
} }
return sh, err return sh, nil
} }
func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard.Option { func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard.Option {

View file

@ -2,6 +2,7 @@ package shard
import ( import (
"context" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
@ -30,27 +31,32 @@ func (s *Shard) ID() *ID {
// UpdateID reads shard ID saved in the metabase and updates it if it is missing. // UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID(ctx context.Context) (err error) { func (s *Shard) UpdateID(ctx context.Context) (err error) {
var metabaseOpened bool
var idFromMetabase []byte
if err = s.metaBase.Open(ctx, false); err != nil { if err = s.metaBase.Open(ctx, false); err != nil {
return err err = fmt.Errorf("failed to open metabase: %w", err)
} else {
metabaseOpened = true
} }
if metabaseOpened {
defer func() { defer func() {
cErr := s.metaBase.Close() cErr := s.metaBase.Close()
if err == nil { if cErr != nil {
err = cErr err = fmt.Errorf("failed to close metabase: %w", cErr)
} }
}() }()
id, err := s.metaBase.ReadShardID() if idFromMetabase, err = s.metaBase.ReadShardID(); err != nil {
if err != nil { err = fmt.Errorf("failed to read shard id from metabase: %w", err)
return err }
}
if len(idFromMetabase) != 0 {
s.info.ID = NewIDFromBytes(idFromMetabase)
} }
if len(id) != 0 {
s.info.ID = NewIDFromBytes(id)
if s.cfg.metricsWriter != nil { if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.SetShardID(s.info.ID.String()) s.cfg.metricsWriter.SetShardID(s.info.ID.String())
} }
}
s.log = &logger.Logger{Logger: s.log.With(zap.Stringer("shard_id", s.info.ID))} s.log = &logger.Logger{Logger: s.log.With(zap.Stringer("shard_id", s.info.ID))}
s.metaBase.SetLogger(s.log) s.metaBase.SetLogger(s.log)
s.blobStor.SetLogger(s.log) s.blobStor.SetLogger(s.log)
@ -63,8 +69,10 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
s.pilorama.SetParentID(s.info.ID.String()) s.pilorama.SetParentID(s.info.ID.String())
} }
if len(id) != 0 { if len(idFromMetabase) == 0 && metabaseOpened {
return nil if err = s.metaBase.WriteShardID(*s.info.ID); err != nil {
err = fmt.Errorf("failed to write shard id to metabase: %w", err)
} }
return s.metaBase.WriteShardID(*s.info.ID) }
return
} }