forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
03976c6ed5
exportloopref is deprecated. gopatch: ``` @@ var index, value identifier var slice expression @@ for index, value := range slice { ... -value := value ... } @@ var index, value identifier var slice expression @@ for index, value := range slice { ... -index := index ... } @@ var value identifier var channel expression @@ for value := range channel { ... -value := value ... } ``` Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
371 lines
9.1 KiB
Go
371 lines
9.1 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type shardInitError struct {
|
|
err error
|
|
id string
|
|
}
|
|
|
|
// Open opens all StorageEngine's components.
|
|
func (e *StorageEngine) Open(ctx context.Context) error {
|
|
return e.open(ctx)
|
|
}
|
|
|
|
func (e *StorageEngine) open(ctx context.Context) error {
|
|
e.mtx.Lock()
|
|
defer e.mtx.Unlock()
|
|
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan shardInitError, len(e.shards))
|
|
|
|
for id, sh := range e.shards {
|
|
wg.Add(1)
|
|
go func(id string, sh *shard.Shard) {
|
|
defer wg.Done()
|
|
if err := sh.Open(ctx); err != nil {
|
|
errCh <- shardInitError{
|
|
err: err,
|
|
id: id,
|
|
}
|
|
}
|
|
}(id, sh.Shard)
|
|
}
|
|
wg.Wait()
|
|
close(errCh)
|
|
|
|
for res := range errCh {
|
|
if res.err != nil {
|
|
e.log.Error(logs.EngineCouldNotOpenShardClosingAndSkipping,
|
|
zap.String("id", res.id),
|
|
zap.Error(res.err))
|
|
|
|
sh := e.shards[res.id]
|
|
delete(e.shards, res.id)
|
|
|
|
err := sh.Close()
|
|
if err != nil {
|
|
e.log.Error(logs.EngineCouldNotClosePartiallyInitializedShard,
|
|
zap.String("id", res.id),
|
|
zap.Error(res.err))
|
|
}
|
|
|
|
continue
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Init initializes all StorageEngine's components.
|
|
func (e *StorageEngine) Init(ctx context.Context) error {
|
|
e.mtx.Lock()
|
|
defer e.mtx.Unlock()
|
|
|
|
errCh := make(chan shardInitError, len(e.shards))
|
|
var eg errgroup.Group
|
|
if e.cfg.lowMem && e.anyShardRequiresRefill() {
|
|
eg.SetLimit(1)
|
|
}
|
|
|
|
for id, sh := range e.shards {
|
|
eg.Go(func() error {
|
|
if err := sh.Init(ctx); err != nil {
|
|
errCh <- shardInitError{
|
|
err: err,
|
|
id: id,
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
err := eg.Wait()
|
|
close(errCh)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize shards: %w", err)
|
|
}
|
|
|
|
for res := range errCh {
|
|
if res.err != nil {
|
|
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
|
|
e.log.Error(logs.EngineCouldNotInitializeShardClosingAndSkipping,
|
|
zap.String("id", res.id),
|
|
zap.Error(res.err))
|
|
|
|
sh := e.shards[res.id]
|
|
delete(e.shards, res.id)
|
|
|
|
err := sh.Close()
|
|
if err != nil {
|
|
e.log.Error(logs.EngineCouldNotClosePartiallyInitializedShard,
|
|
zap.String("id", res.id),
|
|
zap.Error(res.err))
|
|
}
|
|
|
|
continue
|
|
}
|
|
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
|
}
|
|
}
|
|
|
|
if len(e.shards) == 0 {
|
|
return errors.New("failed initialization on all shards")
|
|
}
|
|
|
|
e.wg.Add(1)
|
|
go e.setModeLoop()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *StorageEngine) anyShardRequiresRefill() bool {
|
|
for _, sh := range e.shards {
|
|
if sh.NeedRefillMetabase() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var errClosed = errors.New("storage engine is closed")
|
|
|
|
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
|
|
// After the call, all the next ones will fail.
|
|
//
|
|
// The method MUST only be called when the application exits.
|
|
func (e *StorageEngine) Close(ctx context.Context) error {
|
|
close(e.closeCh)
|
|
defer e.wg.Wait()
|
|
return e.setBlockExecErr(ctx, errClosed)
|
|
}
|
|
|
|
// closes all shards. Never returns an error, shard errors are logged.
|
|
func (e *StorageEngine) close(releasePools bool) error {
|
|
e.mtx.RLock()
|
|
defer e.mtx.RUnlock()
|
|
|
|
if releasePools {
|
|
for _, p := range e.shardPools {
|
|
p.Release()
|
|
}
|
|
}
|
|
|
|
for id, sh := range e.shards {
|
|
if err := sh.Close(); err != nil {
|
|
e.log.Debug(logs.EngineCouldNotCloseShard,
|
|
zap.String("id", id),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// executes op if execution is not blocked, otherwise returns blocking error.
|
|
//
|
|
// Can be called concurrently with setBlockExecErr.
|
|
func (e *StorageEngine) execIfNotBlocked(op func() error) error {
|
|
e.blockExec.mtx.RLock()
|
|
defer e.blockExec.mtx.RUnlock()
|
|
|
|
if e.blockExec.err != nil {
|
|
return e.blockExec.err
|
|
}
|
|
|
|
return op()
|
|
}
|
|
|
|
// sets the flag of blocking execution of all data operations according to err:
|
|
// - err != nil, then blocks the execution. If exec wasn't blocked, calls close method
|
|
// (if err == errClosed => additionally releases pools and does not allow to resume executions).
|
|
// - otherwise, resumes execution. If exec was blocked, calls open method.
|
|
//
|
|
// Can be called concurrently with exec. In this case it waits for all executions to complete.
|
|
func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
|
e.blockExec.mtx.Lock()
|
|
defer e.blockExec.mtx.Unlock()
|
|
|
|
prevErr := e.blockExec.err
|
|
|
|
wasClosed := errors.Is(prevErr, errClosed)
|
|
if wasClosed {
|
|
return errClosed
|
|
}
|
|
|
|
e.blockExec.err = err
|
|
|
|
if err == nil {
|
|
if prevErr != nil { // block -> ok
|
|
return e.open(ctx)
|
|
}
|
|
} else if prevErr == nil { // ok -> block
|
|
return e.close(errors.Is(err, errClosed))
|
|
}
|
|
|
|
// otherwise do nothing
|
|
|
|
return nil
|
|
}
|
|
|
|
// BlockExecution blocks the execution of any data-related operation. All blocked ops will return err.
|
|
// To resume the execution, use ResumeExecution method.
|
|
//
|
|
// Сan be called regardless of the fact of the previous blocking. If execution wasn't blocked, releases all resources
|
|
// similar to Close. Can be called concurrently with Close and any data related method (waits for all executions
|
|
// to complete). Returns error if any Close has been called before.
|
|
//
|
|
// Must not be called concurrently with either Open or Init.
|
|
//
|
|
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
|
|
// for this.
|
|
func (e *StorageEngine) BlockExecution(err error) error {
|
|
return e.setBlockExecErr(context.Background(), err)
|
|
}
|
|
|
|
// ResumeExecution resumes the execution of any data-related operation.
|
|
// To block the execution, use BlockExecution method.
|
|
//
|
|
// Сan be called regardless of the fact of the previous blocking. If execution was blocked, prepares all resources
|
|
// similar to Open. Can be called concurrently with Close and any data related method (waits for all executions
|
|
// to complete). Returns error if any Close has been called before.
|
|
//
|
|
// Must not be called concurrently with either Open or Init.
|
|
func (e *StorageEngine) ResumeExecution() error {
|
|
return e.setBlockExecErr(context.Background(), nil)
|
|
}
|
|
|
|
type ReConfiguration struct {
|
|
errorsThreshold uint32
|
|
shardPoolSize uint32
|
|
|
|
shards map[string][]shard.Option // meta path -> shard opts
|
|
}
|
|
|
|
// SetErrorsThreshold sets a size amount of errors after which
|
|
// shard is moved to read-only mode.
|
|
func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
|
|
rCfg.errorsThreshold = errorsThreshold
|
|
}
|
|
|
|
// SetShardPoolSize sets a size of worker pool for each shard.
|
|
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
|
|
rCfg.shardPoolSize = shardPoolSize
|
|
}
|
|
|
|
// AddShard adds a shard for the reconfiguration.
|
|
// Shard identifier is calculated from paths used in blobstor.
|
|
func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {
|
|
if rCfg.shards == nil {
|
|
rCfg.shards = make(map[string][]shard.Option)
|
|
}
|
|
|
|
if _, found := rCfg.shards[id]; found {
|
|
return
|
|
}
|
|
|
|
rCfg.shards[id] = opts
|
|
}
|
|
|
|
// Reload reloads StorageEngine's configuration in runtime.
|
|
func (e *StorageEngine) Reload(ctx context.Context, rcfg ReConfiguration) error {
|
|
type reloadInfo struct {
|
|
sh *shard.Shard
|
|
opts []shard.Option
|
|
}
|
|
|
|
e.mtx.RLock()
|
|
|
|
var shardsToRemove []string // shards IDs
|
|
var shardsToAdd []string // shard config identifiers (blobstor paths concatenation)
|
|
var shardsToReload []reloadInfo
|
|
|
|
// mark removed shards for removal
|
|
for id, sh := range e.shards {
|
|
_, ok := rcfg.shards[calculateShardID(sh.DumpInfo())]
|
|
if !ok {
|
|
shardsToRemove = append(shardsToRemove, id)
|
|
}
|
|
}
|
|
|
|
loop:
|
|
for newID := range rcfg.shards {
|
|
for _, sh := range e.shards {
|
|
// This calculation should be kept in sync with node
|
|
// configuration parsing during SIGHUP.
|
|
if newID == calculateShardID(sh.DumpInfo()) {
|
|
shardsToReload = append(shardsToReload, reloadInfo{
|
|
sh: sh.Shard,
|
|
opts: rcfg.shards[newID],
|
|
})
|
|
continue loop
|
|
}
|
|
}
|
|
|
|
shardsToAdd = append(shardsToAdd, newID)
|
|
}
|
|
|
|
e.mtx.RUnlock()
|
|
|
|
e.removeShards(shardsToRemove...)
|
|
|
|
for _, p := range shardsToReload {
|
|
err := p.sh.Reload(ctx, p.opts...)
|
|
if err != nil {
|
|
e.log.Error(logs.EngineCouldNotReloadAShard,
|
|
zap.Stringer("shard id", p.sh.ID()),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
for _, newID := range shardsToAdd {
|
|
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
|
if err != nil {
|
|
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
|
}
|
|
|
|
idStr := sh.ID().String()
|
|
|
|
err = sh.Open(ctx)
|
|
if err == nil {
|
|
err = sh.Init(ctx)
|
|
}
|
|
if err != nil {
|
|
_ = sh.Close()
|
|
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
|
}
|
|
|
|
err = e.addShard(sh)
|
|
if err != nil {
|
|
_ = sh.Close()
|
|
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
|
}
|
|
|
|
e.log.Info(logs.EngineAddedNewShard, zap.String("id", idStr))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func calculateShardID(info shard.Info) string {
|
|
// This calculation should be kept in sync with node
|
|
// configuration parsing during SIGHUP.
|
|
var sb strings.Builder
|
|
for _, sub := range info.BlobStorInfo.SubStorages {
|
|
sb.WriteString(filepath.Clean(sub.Path))
|
|
}
|
|
return sb.String()
|
|
}
|