[#1549] engine: Disable shard on blobovnicza init failure
There is a need to support working w/o shard if it has problems with blobovnicza tree. Make `BlobStor.Init` to return new `ErrInitBlobovniczas` error. Remove shard from storage engine's shard set if it returned this error from `Init` call. So if some of the shards (but not all) return this error, the node will be able to continue working without them. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru> Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
40a56c6b42
commit
e38b0aa4ba
2 changed files with 45 additions and 8 deletions
|
@ -1,5 +1,10 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Open opens BlobStor.
|
||||
func (b *BlobStor) Open() error {
|
||||
b.log.Debug("opening...")
|
||||
|
@ -7,13 +12,23 @@ func (b *BlobStor) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ErrInitBlobovniczas is returned when blobovnicza initialization fails.
|
||||
var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stage")
|
||||
|
||||
// Init initializes internal data structures and system resources.
|
||||
//
|
||||
// If BlobStor is already initialized, no action is taken.
|
||||
//
|
||||
// Returns wrapped ErrInitBlobovniczas on blobovnicza tree's initializaiton failure.
|
||||
func (b *BlobStor) Init() error {
|
||||
b.log.Debug("initializing...")
|
||||
|
||||
return b.blobovniczas.init()
|
||||
err := b.blobovniczas.init()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close releases all internal resources of BlobStor.
|
||||
|
|
|
@ -5,10 +5,16 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type shardInitError struct {
|
||||
err error
|
||||
id string
|
||||
}
|
||||
|
||||
// Open opens all StorageEngine's components.
|
||||
func (e *StorageEngine) Open() error {
|
||||
return e.open()
|
||||
|
@ -44,30 +50,46 @@ func (e *StorageEngine) open() error {
|
|||
|
||||
// Init initializes all StorageEngine's components.
|
||||
func (e *StorageEngine) Init() error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var errCh = make(chan error, len(e.shards))
|
||||
var errCh = make(chan shardInitError, len(e.shards))
|
||||
|
||||
for id, sh := range e.shards {
|
||||
wg.Add(1)
|
||||
go func(id string, sh *shard.Shard) {
|
||||
defer wg.Done()
|
||||
if err := sh.Init(); err != nil {
|
||||
errCh <- fmt.Errorf("could not initialize shard %s: %w", id, err)
|
||||
errCh <- shardInitError{
|
||||
err: err,
|
||||
id: id,
|
||||
}
|
||||
}
|
||||
}(id, sh.Shard)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
for err := range errCh {
|
||||
if err != nil {
|
||||
return err
|
||||
for res := range errCh {
|
||||
if res.err != nil {
|
||||
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
|
||||
delete(e.shards, res.id)
|
||||
|
||||
e.log.Error("shard initialization failure, skipping",
|
||||
zap.String("id", res.id),
|
||||
zap.Error(res.err))
|
||||
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(e.shards) == 0 {
|
||||
return errors.New("failed initialization on all shards")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue