package engine import ( "context" "errors" "fmt" "path/filepath" "strings" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) type shardInitError struct { err error id string } // Open opens all StorageEngine's components. func (e *StorageEngine) Open(ctx context.Context) error { e.mtx.Lock() defer e.mtx.Unlock() var wg sync.WaitGroup errCh := make(chan shardInitError, len(e.shards)) for id, sh := range e.shards { wg.Add(1) go func(id string, sh *shard.Shard) { defer wg.Done() if err := sh.Open(ctx); err != nil { errCh <- shardInitError{ err: err, id: id, } } }(id, sh.Shard) } wg.Wait() close(errCh) for res := range errCh { if res.err != nil { e.log.Error(ctx, logs.EngineCouldNotOpenShardClosingAndSkipping, zap.String("id", res.id), zap.Error(res.err)) sh := e.shards[res.id] delete(e.shards, res.id) err := sh.Close(ctx) if err != nil { e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard, zap.String("id", res.id), zap.Error(res.err)) } continue } } return nil } // Init initializes all StorageEngine's components. func (e *StorageEngine) Init(ctx context.Context) error { e.mtx.Lock() defer e.mtx.Unlock() errCh := make(chan shardInitError, len(e.shards)) var eg errgroup.Group if e.lowMem && e.anyShardRequiresRefill() { eg.SetLimit(1) } for id, sh := range e.shards { eg.Go(func() error { if err := sh.Init(ctx); err != nil { errCh <- shardInitError{ err: err, id: id, } } return nil }) } err := eg.Wait() close(errCh) if err != nil { return fmt.Errorf("initialize shards: %w", err) } for res := range errCh { if res.err != nil { if errors.Is(res.err, blobstor.ErrInitBlobovniczas) { e.log.Error(ctx, logs.EngineCouldNotInitializeShardClosingAndSkipping, zap.String("id", res.id), zap.Error(res.err)) sh := e.shards[res.id] delete(e.shards, res.id) err := sh.Close(ctx) if err != nil { e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard, zap.String("id", res.id), zap.Error(res.err)) } continue } return fmt.Errorf("initialize shard %s: %w", res.id, res.err) } } if len(e.shards) == 0 { return errors.New("failed initialization on all shards") } e.wg.Add(1) go e.setModeLoop(ctx) return nil } func (e *StorageEngine) anyShardRequiresRefill() bool { for _, sh := range e.shards { if sh.NeedRefillMetabase() { return true } } return false } var errClosed = errors.New("storage engine is closed") // Close releases all StorageEngine's components. Waits for all data-related operations to complete. // After the call, all the next ones will fail. // // The method MUST only be called when the application exits. func (e *StorageEngine) Close(ctx context.Context) error { close(e.closeCh) defer e.wg.Wait() return e.closeEngine(ctx) } // closes all shards. Never returns an error, shard errors are logged. func (e *StorageEngine) closeAllShards(ctx context.Context) error { e.mtx.RLock() defer e.mtx.RUnlock() for id, sh := range e.shards { if err := sh.Close(ctx); err != nil { e.log.Debug(ctx, logs.EngineCouldNotCloseShard, zap.String("id", id), zap.Error(err), ) } } return nil } // executes op if execution is not blocked, otherwise returns blocking error. // // Can be called concurrently with setBlockExecErr. func (e *StorageEngine) execIfNotBlocked(op func() error) error { e.blockExec.mtx.RLock() defer e.blockExec.mtx.RUnlock() if e.blockExec.closed { return errClosed } return op() } func (e *StorageEngine) closeEngine(ctx context.Context) error { e.blockExec.mtx.Lock() defer e.blockExec.mtx.Unlock() if e.blockExec.closed { return errClosed } e.blockExec.closed = true return e.closeAllShards(ctx) } type ReConfiguration struct { shards map[string][]shard.Option // meta path -> shard opts } // AddShard adds a shard for the reconfiguration. // Shard identifier is calculated from paths used in blobstor. func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) { if rCfg.shards == nil { rCfg.shards = make(map[string][]shard.Option) } if _, found := rCfg.shards[id]; found { return } rCfg.shards[id] = opts } // Reload reloads StorageEngine's configuration in runtime. func (e *StorageEngine) Reload(ctx context.Context, rcfg ReConfiguration) error { type reloadInfo struct { sh *shard.Shard opts []shard.Option } e.mtx.RLock() var shardsToRemove []string // shards IDs var shardsToAdd []string // shard config identifiers (blobstor paths concatenation) var shardsToReload []reloadInfo // mark removed shards for removal for id, sh := range e.shards { _, ok := rcfg.shards[calculateShardID(sh.DumpInfo())] if !ok { shardsToRemove = append(shardsToRemove, id) } } loop: for newID := range rcfg.shards { for _, sh := range e.shards { // This calculation should be kept in sync with node // configuration parsing during SIGHUP. if newID == calculateShardID(sh.DumpInfo()) { shardsToReload = append(shardsToReload, reloadInfo{ sh: sh.Shard, opts: rcfg.shards[newID], }) continue loop } } shardsToAdd = append(shardsToAdd, newID) } e.mtx.RUnlock() e.removeShards(ctx, shardsToRemove...) for _, p := range shardsToReload { err := p.sh.Reload(ctx, p.opts...) if err != nil { e.log.Error(ctx, logs.EngineCouldNotReloadAShard, zap.Stringer("shard id", p.sh.ID()), zap.Error(err)) } } for _, newID := range shardsToAdd { sh, err := e.createShard(ctx, rcfg.shards[newID]) if err != nil { return fmt.Errorf("add new shard with '%s' metabase path: %w", newID, err) } idStr := sh.ID().String() err = sh.Open(ctx) if err == nil { err = sh.Init(ctx) } if err != nil { _ = sh.Close(ctx) return fmt.Errorf("init %s shard: %w", idStr, err) } err = e.addShard(sh) if err != nil { _ = sh.Close(ctx) return fmt.Errorf("add %s shard: %w", idStr, err) } e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr)) } return nil } func calculateShardID(info shard.Info) string { // This calculation should be kept in sync with node // configuration parsing during SIGHUP. var sb strings.Builder for _, sub := range info.BlobStorInfo.SubStorages { sb.WriteString(filepath.Clean(sub.Path)) } return sb.String() }