[#2238] engine: Make Open
and Init
similar
1. Both could initialize shards in parallel. 2. Both should close shards after an error. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
e0309e398c
commit
c53903ccd0
2 changed files with 36 additions and 10 deletions
|
@ -16,6 +16,7 @@ Changelog for FrostFS Node
|
||||||
- Env prefix in configuration changed to `FROSTFS_*` (#43)
|
- Env prefix in configuration changed to `FROSTFS_*` (#43)
|
||||||
- Link object is broadcast throughout the whole container now (#57)
|
- Link object is broadcast throughout the whole container now (#57)
|
||||||
- Pilorama now can merge multiple batches into one (#2231)
|
- Pilorama now can merge multiple batches into one (#2231)
|
||||||
|
- Storage engine now can start even when some shard components are unavailable (#2238)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
- Increase payload size metric on shards' `put` operation (#1794)
|
- Increase payload size metric on shards' `put` operation (#1794)
|
||||||
|
|
|
@ -23,27 +23,44 @@ func (e *StorageEngine) Open() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) open() error {
|
func (e *StorageEngine) open() error {
|
||||||
e.mtx.RLock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var errCh = make(chan error, len(e.shards))
|
var errCh = make(chan shardInitError, len(e.shards))
|
||||||
|
|
||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(id string, sh *shard.Shard) {
|
go func(id string, sh *shard.Shard) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
errCh <- fmt.Errorf("could not open shard %s: %w", id, err)
|
errCh <- shardInitError{
|
||||||
|
err: err,
|
||||||
|
id: id,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}(id, sh.Shard)
|
}(id, sh.Shard)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
|
|
||||||
for err := range errCh {
|
for res := range errCh {
|
||||||
|
if res.err != nil {
|
||||||
|
e.log.Error("could not open shard, closing and skipping",
|
||||||
|
zap.String("id", res.id),
|
||||||
|
zap.Error(res.err))
|
||||||
|
|
||||||
|
sh := e.shards[res.id]
|
||||||
|
delete(e.shards, res.id)
|
||||||
|
|
||||||
|
err := sh.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
e.log.Error("could not close partially initialized shard",
|
||||||
|
zap.String("id", res.id),
|
||||||
|
zap.Error(res.err))
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,12 +93,20 @@ func (e *StorageEngine) Init() error {
|
||||||
for res := range errCh {
|
for res := range errCh {
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
|
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
|
||||||
delete(e.shards, res.id)
|
e.log.Error("could not initialize shard, closing and skipping",
|
||||||
|
|
||||||
e.log.Error("shard initialization failure, skipping",
|
|
||||||
zap.String("id", res.id),
|
zap.String("id", res.id),
|
||||||
zap.Error(res.err))
|
zap.Error(res.err))
|
||||||
|
|
||||||
|
sh := e.shards[res.id]
|
||||||
|
delete(e.shards, res.id)
|
||||||
|
|
||||||
|
err := sh.Close()
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error("could not close partially initialized shard",
|
||||||
|
zap.String("id", res.id),
|
||||||
|
zap.Error(res.err))
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
||||||
|
|
Loading…
Reference in a new issue