From c53903ccd03e1d1f8ab132234abc3804884d6e58 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 13:30:33 +0300 Subject: [PATCH] [#2238] engine: Make `Open` and `Init` similar 1. Both could initialize shards in parallel. 2. Both should close shards after an error. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/engine/control.go | 45 +++++++++++++++++----- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06c8ebbb47..6be07ddaf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Changelog for FrostFS Node - Env prefix in configuration changed to `FROSTFS_*` (#43) - Link object is broadcast throughout the whole container now (#57) - Pilorama now can merge multiple batches into one (#2231) +- Storage engine now can start even when some shard components are unavailable (#2238) ### Fixed - Increase payload size metric on shards' `put` operation (#1794) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index a56d7bbb23..b9a8313595 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -23,27 +23,44 @@ func (e *StorageEngine) Open() error { } func (e *StorageEngine) open() error { - e.mtx.RLock() - defer e.mtx.RUnlock() + e.mtx.Lock() + defer e.mtx.Unlock() var wg sync.WaitGroup - var errCh = make(chan error, len(e.shards)) + var errCh = make(chan shardInitError, len(e.shards)) for id, sh := range e.shards { wg.Add(1) go func(id string, sh *shard.Shard) { defer wg.Done() if err := sh.Open(); err != nil { - errCh <- fmt.Errorf("could not open shard %s: %w", id, err) + errCh <- shardInitError{ + err: err, + id: id, + } } }(id, sh.Shard) } wg.Wait() close(errCh) - for err := range errCh { - if err != nil { - return err + for res := range errCh { + if res.err != nil { + e.log.Error("could not open shard, closing and skipping", + zap.String("id", res.id), + zap.Error(res.err)) + + sh := e.shards[res.id] + delete(e.shards, res.id) + + err := sh.Close() + if err != nil { + e.log.Error("could not close partially initialized shard", + zap.String("id", res.id), + zap.Error(res.err)) + } + + continue } } @@ -76,12 +93,20 @@ func (e *StorageEngine) Init() error { for res := range errCh { if res.err != nil { if errors.Is(res.err, blobstor.ErrInitBlobovniczas) { - delete(e.shards, res.id) - - e.log.Error("shard initialization failure, skipping", + e.log.Error("could not initialize shard, closing and skipping", zap.String("id", res.id), zap.Error(res.err)) + sh := e.shards[res.id] + delete(e.shards, res.id) + + err := sh.Close() + if err != nil { + e.log.Error("could not close partially initialized shard", + zap.String("id", res.id), + zap.Error(res.err)) + } + continue } return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)