frostfs-node/pkg/local_object_storage/engine/control.go

374 lines
9.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package engine
import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type shardInitError struct {
err error
id string
}
// Open opens all StorageEngine's components.
func (e *StorageEngine) Open(ctx context.Context) error {
return e.open(ctx)
}
func (e *StorageEngine) open(ctx context.Context) error {
e.mtx.Lock()
defer e.mtx.Unlock()
var wg sync.WaitGroup
errCh := make(chan shardInitError, len(e.shards))
for id, sh := range e.shards {
wg.Add(1)
go func(id string, sh *shard.Shard) {
defer wg.Done()
if err := sh.Open(ctx); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
}(id, sh.Shard)
}
wg.Wait()
close(errCh)
for res := range errCh {
if res.err != nil {
e.log.Error(logs.EngineCouldNotOpenShardClosingAndSkipping,
zap.String("id", res.id),
zap.Error(res.err))
sh := e.shards[res.id]
delete(e.shards, res.id)
err := sh.Close()
if err != nil {
e.log.Error(logs.EngineCouldNotClosePartiallyInitializedShard,
zap.String("id", res.id),
zap.Error(res.err))
}
continue
}
}
return nil
}
// Init initializes all StorageEngine's components.
func (e *StorageEngine) Init(ctx context.Context) error {
e.mtx.Lock()
defer e.mtx.Unlock()
errCh := make(chan shardInitError, len(e.shards))
var eg errgroup.Group
if e.cfg.lowMem && e.anyShardRequiresRefill() {
eg.SetLimit(1)
}
for id, sh := range e.shards {
id := id
sh := sh
eg.Go(func() error {
if err := sh.Init(ctx); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
return nil
})
}
err := eg.Wait()
close(errCh)
if err != nil {
return fmt.Errorf("failed to initialize shards: %w", err)
}
for res := range errCh {
if res.err != nil {
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
e.log.Error(logs.EngineCouldNotInitializeShardClosingAndSkipping,
zap.String("id", res.id),
zap.Error(res.err))
sh := e.shards[res.id]
delete(e.shards, res.id)
err := sh.Close()
if err != nil {
e.log.Error(logs.EngineCouldNotClosePartiallyInitializedShard,
zap.String("id", res.id),
zap.Error(res.err))
}
continue
}
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
}
}
if len(e.shards) == 0 {
return errors.New("failed initialization on all shards")
}
e.wg.Add(1)
go e.setModeLoop()
return nil
}
func (e *StorageEngine) anyShardRequiresRefill() bool {
for _, sh := range e.shards {
if sh.NeedRefillMetabase() {
return true
}
}
return false
}
var errClosed = errors.New("storage engine is closed")
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
// After the call, all the next ones will fail.
//
// The method MUST only be called when the application exits.
func (e *StorageEngine) Close(ctx context.Context) error {
close(e.closeCh)
defer e.wg.Wait()
return e.setBlockExecErr(ctx, errClosed)
}
// closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(releasePools bool) error {
e.mtx.RLock()
defer e.mtx.RUnlock()
if releasePools {
for _, p := range e.shardPools {
p.Release()
}
}
for id, sh := range e.shards {
if err := sh.Close(); err != nil {
e.log.Debug(logs.EngineCouldNotCloseShard,
zap.String("id", id),
zap.String("error", err.Error()),
)
}
}
return nil
}
// executes op if execution is not blocked, otherwise returns blocking error.
//
// Can be called concurrently with setBlockExecErr.
func (e *StorageEngine) execIfNotBlocked(op func() error) error {
e.blockExec.mtx.RLock()
defer e.blockExec.mtx.RUnlock()
if e.blockExec.err != nil {
return e.blockExec.err
}
return op()
}
// sets the flag of blocking execution of all data operations according to err:
// - err != nil, then blocks the execution. If exec wasn't blocked, calls close method
// (if err == errClosed => additionally releases pools and does not allow to resume executions).
// - otherwise, resumes execution. If exec was blocked, calls open method.
//
// Can be called concurrently with exec. In this case it waits for all executions to complete.
func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
e.blockExec.mtx.Lock()
defer e.blockExec.mtx.Unlock()
prevErr := e.blockExec.err
wasClosed := errors.Is(prevErr, errClosed)
if wasClosed {
return errClosed
}
e.blockExec.err = err
if err == nil {
if prevErr != nil { // block -> ok
return e.open(ctx)
}
} else if prevErr == nil { // ok -> block
return e.close(errors.Is(err, errClosed))
}
// otherwise do nothing
return nil
}
// BlockExecution blocks the execution of any data-related operation. All blocked ops will return err.
// To resume the execution, use ResumeExecution method.
//
// Сan be called regardless of the fact of the previous blocking. If execution wasn't blocked, releases all resources
// similar to Close. Can be called concurrently with Close and any data related method (waits for all executions
// to complete). Returns error if any Close has been called before.
//
// Must not be called concurrently with either Open or Init.
//
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
// for this.
func (e *StorageEngine) BlockExecution(err error) error {
return e.setBlockExecErr(context.Background(), err)
}
// ResumeExecution resumes the execution of any data-related operation.
// To block the execution, use BlockExecution method.
//
// Сan be called regardless of the fact of the previous blocking. If execution was blocked, prepares all resources
// similar to Open. Can be called concurrently with Close and any data related method (waits for all executions
// to complete). Returns error if any Close has been called before.
//
// Must not be called concurrently with either Open or Init.
func (e *StorageEngine) ResumeExecution() error {
return e.setBlockExecErr(context.Background(), nil)
}
type ReConfiguration struct {
errorsThreshold uint32
shardPoolSize uint32
shards map[string][]shard.Option // meta path -> shard opts
}
// SetErrorsThreshold sets a size amount of errors after which
// shard is moved to read-only mode.
func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
rCfg.errorsThreshold = errorsThreshold
}
// SetShardPoolSize sets a size of worker pool for each shard.
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
rCfg.shardPoolSize = shardPoolSize
}
// AddShard adds a shard for the reconfiguration.
// Shard identifier is calculated from paths used in blobstor.
func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {
if rCfg.shards == nil {
rCfg.shards = make(map[string][]shard.Option)
}
if _, found := rCfg.shards[id]; found {
return
}
rCfg.shards[id] = opts
}
// Reload reloads StorageEngine's configuration in runtime.
func (e *StorageEngine) Reload(ctx context.Context, rcfg ReConfiguration) error {
type reloadInfo struct {
sh *shard.Shard
opts []shard.Option
}
e.mtx.RLock()
var shardsToRemove []string // shards IDs
var shardsToAdd []string // shard config identifiers (blobstor paths concatenation)
var shardsToReload []reloadInfo
// mark removed shards for removal
for id, sh := range e.shards {
_, ok := rcfg.shards[calculateShardID(sh.DumpInfo())]
if !ok {
shardsToRemove = append(shardsToRemove, id)
}
}
loop:
for newID := range rcfg.shards {
for _, sh := range e.shards {
// This calculation should be kept in sync with node
// configuration parsing during SIGHUP.
if newID == calculateShardID(sh.DumpInfo()) {
shardsToReload = append(shardsToReload, reloadInfo{
sh: sh.Shard,
opts: rcfg.shards[newID],
})
continue loop
}
}
shardsToAdd = append(shardsToAdd, newID)
}
e.mtx.RUnlock()
e.removeShards(shardsToRemove...)
for _, p := range shardsToReload {
err := p.sh.Reload(ctx, p.opts...)
if err != nil {
e.log.Error(logs.EngineCouldNotReloadAShard,
zap.Stringer("shard id", p.sh.ID()),
zap.Error(err))
}
}
for _, newID := range shardsToAdd {
sh, err := e.createShard(ctx, rcfg.shards[newID])
if err != nil {
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
}
idStr := sh.ID().String()
err = sh.Open(ctx)
if err == nil {
err = sh.Init(ctx)
}
if err != nil {
_ = sh.Close()
return fmt.Errorf("could not init %s shard: %w", idStr, err)
}
err = e.addShard(sh)
if err != nil {
_ = sh.Close()
return fmt.Errorf("could not add %s shard: %w", idStr, err)
}
e.log.Info(logs.EngineAddedNewShard, zap.String("id", idStr))
}
return nil
}
func calculateShardID(info shard.Info) string {
// This calculation should be kept in sync with node
// configuration parsing during SIGHUP.
var sb strings.Builder
for _, sub := range info.BlobStorInfo.SubStorages {
sb.WriteString(filepath.Clean(sub.Path))
}
return sb.String()
}