package engine

import (
	"context"
	"errors"
	"fmt"
	"path/filepath"
	"strings"
	"sync"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
)

type shardInitError struct {
	err error
	id  string
}

// Open opens all StorageEngine's components.
func (e *StorageEngine) Open() error {
	return e.open()
}

func (e *StorageEngine) open() error {
	e.mtx.Lock()
	defer e.mtx.Unlock()

	var wg sync.WaitGroup
	var errCh = make(chan shardInitError, len(e.shards))

	for id, sh := range e.shards {
		wg.Add(1)
		go func(id string, sh *shard.Shard) {
			defer wg.Done()
			if err := sh.Open(); err != nil {
				errCh <- shardInitError{
					err: err,
					id:  id,
				}
			}
		}(id, sh.Shard)
	}
	wg.Wait()
	close(errCh)

	for res := range errCh {
		if res.err != nil {
			e.log.Error(logs.EngineCouldNotOpenShardClosingAndSkipping,
				zap.String("id", res.id),
				zap.Error(res.err))

			sh := e.shards[res.id]
			delete(e.shards, res.id)

			err := sh.Close()
			if err != nil {
				e.log.Error(logs.EngineCouldNotClosePartiallyInitializedShard,
					zap.String("id", res.id),
					zap.Error(res.err))
			}

			continue
		}
	}

	return nil
}

// Init initializes all StorageEngine's components.
func (e *StorageEngine) Init(ctx context.Context) error {
	e.mtx.Lock()
	defer e.mtx.Unlock()

	var errCh = make(chan shardInitError, len(e.shards))
	var eg errgroup.Group
	if e.cfg.lowMem && e.anyShardRequiresRefill() {
		eg.SetLimit(1)
	}

	for id, sh := range e.shards {
		id := id
		sh := sh
		eg.Go(func() error {
			if err := sh.Init(ctx); err != nil {
				errCh <- shardInitError{
					err: err,
					id:  id,
				}
			}
			return nil
		})
	}
	err := eg.Wait()
	close(errCh)
	if err != nil {
		return fmt.Errorf("failed to initialize shards: %w", err)
	}

	for res := range errCh {
		if res.err != nil {
			if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
				e.log.Error(logs.EngineCouldNotInitializeShardClosingAndSkipping,
					zap.String("id", res.id),
					zap.Error(res.err))

				sh := e.shards[res.id]
				delete(e.shards, res.id)

				err := sh.Close()
				if err != nil {
					e.log.Error(logs.EngineCouldNotClosePartiallyInitializedShard,
						zap.String("id", res.id),
						zap.Error(res.err))
				}

				continue
			}
			return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
		}
	}

	if len(e.shards) == 0 {
		return errors.New("failed initialization on all shards")
	}

	e.wg.Add(1)
	go e.setModeLoop()

	return nil
}

func (e *StorageEngine) anyShardRequiresRefill() bool {
	for _, sh := range e.shards {
		if sh.NeedRefillMetabase() {
			return true
		}
	}
	return false
}

var errClosed = errors.New("storage engine is closed")

// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
// After the call, all the next ones will fail.
//
// The method MUST only be called when the application exits.
func (e *StorageEngine) Close() error {
	close(e.closeCh)
	defer e.wg.Wait()
	return e.setBlockExecErr(errClosed)
}

// closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(releasePools bool) error {
	e.mtx.RLock()
	defer e.mtx.RUnlock()

	if releasePools {
		for _, p := range e.shardPools {
			p.Release()
		}
	}

	for id, sh := range e.shards {
		if err := sh.Close(); err != nil {
			e.log.Debug(logs.EngineCouldNotCloseShard,
				zap.String("id", id),
				zap.String("error", err.Error()),
			)
		}
	}

	return nil
}

// executes op if execution is not blocked, otherwise returns blocking error.
//
// Can be called concurrently with setBlockExecErr.
func (e *StorageEngine) execIfNotBlocked(op func() error) error {
	e.blockExec.mtx.RLock()
	defer e.blockExec.mtx.RUnlock()

	if e.blockExec.err != nil {
		return e.blockExec.err
	}

	return op()
}

// sets the flag of blocking execution of all data operations according to err:
//   - err != nil, then blocks the execution. If exec wasn't blocked, calls close method
//     (if err == errClosed => additionally releases pools and does not allow to resume executions).
//   - otherwise, resumes execution. If exec was blocked, calls open method.
//
// Can be called concurrently with exec. In this case it waits for all executions to complete.
func (e *StorageEngine) setBlockExecErr(err error) error {
	e.blockExec.mtx.Lock()
	defer e.blockExec.mtx.Unlock()

	prevErr := e.blockExec.err

	wasClosed := errors.Is(prevErr, errClosed)
	if wasClosed {
		return errClosed
	}

	e.blockExec.err = err

	if err == nil {
		if prevErr != nil { // block -> ok
			return e.open()
		}
	} else if prevErr == nil { // ok -> block
		return e.close(errors.Is(err, errClosed))
	}

	// otherwise do nothing

	return nil
}

// BlockExecution blocks the execution of any data-related operation. All blocked ops will return err.
// To resume the execution, use ResumeExecution method.
//
// Сan be called regardless of the fact of the previous blocking. If execution wasn't blocked, releases all resources
// similar to Close. Can be called concurrently with Close and any data related method (waits for all executions
// to complete). Returns error if any Close has been called before.
//
// Must not be called concurrently with either Open or Init.
//
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
// for this.
func (e *StorageEngine) BlockExecution(err error) error {
	return e.setBlockExecErr(err)
}

// ResumeExecution resumes the execution of any data-related operation.
// To block the execution, use BlockExecution method.
//
// Сan be called regardless of the fact of the previous blocking. If execution was blocked, prepares all resources
// similar to Open. Can be called concurrently with Close and any data related method (waits for all executions
// to complete). Returns error if any Close has been called before.
//
// Must not be called concurrently with either Open or Init.
func (e *StorageEngine) ResumeExecution() error {
	return e.setBlockExecErr(nil)
}

type ReConfiguration struct {
	errorsThreshold uint32
	shardPoolSize   uint32

	shards map[string][]shard.Option // meta path -> shard opts
}

// SetErrorsThreshold sets a size amount of errors after which
// shard is moved to read-only mode.
func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
	rCfg.errorsThreshold = errorsThreshold
}

// SetShardPoolSize sets a size of worker pool for each shard.
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
	rCfg.shardPoolSize = shardPoolSize
}

// AddShard adds a shard for the reconfiguration.
// Shard identifier is calculated from paths used in blobstor.
func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {
	if rCfg.shards == nil {
		rCfg.shards = make(map[string][]shard.Option)
	}

	if _, found := rCfg.shards[id]; found {
		return
	}

	rCfg.shards[id] = opts
}

// Reload reloads StorageEngine's configuration in runtime.
func (e *StorageEngine) Reload(ctx context.Context, rcfg ReConfiguration) error {
	type reloadInfo struct {
		sh   *shard.Shard
		opts []shard.Option
	}

	e.mtx.RLock()

	var shardsToRemove []string // shards IDs
	var shardsToAdd []string    // shard config identifiers (blobstor paths concatenation)
	var shardsToReload []reloadInfo

	// mark removed shards for removal
	for id, sh := range e.shards {
		_, ok := rcfg.shards[calculateShardID(sh.DumpInfo())]
		if !ok {
			shardsToRemove = append(shardsToRemove, id)
		}
	}

loop:
	for newID := range rcfg.shards {
		for _, sh := range e.shards {
			// This calculation should be kept in sync with node
			// configuration parsing during SIGHUP.
			if newID == calculateShardID(sh.DumpInfo()) {
				shardsToReload = append(shardsToReload, reloadInfo{
					sh:   sh.Shard,
					opts: rcfg.shards[newID],
				})
				continue loop
			}
		}

		shardsToAdd = append(shardsToAdd, newID)
	}

	e.mtx.RUnlock()

	e.removeShards(shardsToRemove...)

	for _, p := range shardsToReload {
		err := p.sh.Reload(ctx, p.opts...)
		if err != nil {
			e.log.Error(logs.EngineCouldNotReloadAShard,
				zap.Stringer("shard id", p.sh.ID()),
				zap.Error(err))
		}
	}

	for _, newID := range shardsToAdd {
		sh, err := e.createShard(rcfg.shards[newID])
		if err != nil {
			return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
		}

		idStr := sh.ID().String()

		err = sh.Open()
		if err == nil {
			err = sh.Init(ctx)
		}
		if err != nil {
			_ = sh.Close()
			return fmt.Errorf("could not init %s shard: %w", idStr, err)
		}

		err = e.addShard(sh)
		if err != nil {
			_ = sh.Close()
			return fmt.Errorf("could not add %s shard: %w", idStr, err)
		}

		e.log.Info(logs.EngineAddedNewShard, zap.String("id", idStr))
	}

	return nil
}

func calculateShardID(info shard.Info) string {
	// This calculation should be kept in sync with node
	// configuration parsing during SIGHUP.
	var sb strings.Builder
	for _, sub := range info.BlobStorInfo.SubStorages {
		sb.WriteString(filepath.Clean(sub.Path))
	}
	return sb.String()
}