package engine

import (
	"context"
	"errors"
	"sync"
	"sync/atomic"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
	"go.uber.org/zap"
)

// StorageEngine represents FrostFS local storage engine.
type StorageEngine struct {
	*cfg

	removeDuplicatesInProgress atomic.Bool

	mtx sync.RWMutex

	shards map[string]hashedShard

	shardPools map[string]util.WorkerPool

	closeCh   chan struct{}
	setModeCh chan setModeRequest
	wg        sync.WaitGroup

	blockExec struct {
		mtx sync.RWMutex

		err error
	}
	evacuateLimiter *evacuationLimiter
}

type shardWrapper struct {
	errorCount *atomic.Uint32
	*shard.Shard
}

type setModeRequest struct {
	sh         *shard.Shard
	isMeta     bool
	errorCount uint32
}

// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
// Instead of creating a worker per single shard we use a single goroutine.
func (e *StorageEngine) setModeLoop() {
	defer e.wg.Done()

	var (
		mtx        sync.RWMutex // protects inProgress map
		inProgress = make(map[string]struct{})
	)

	for {
		select {
		case <-e.closeCh:
			return
		case r := <-e.setModeCh:
			sid := r.sh.ID().String()

			mtx.Lock()
			_, ok := inProgress[sid]
			if !ok {
				inProgress[sid] = struct{}{}
				go func() {
					e.moveToDegraded(r.sh, r.errorCount, r.isMeta)

					mtx.Lock()
					delete(inProgress, sid)
					mtx.Unlock()
				}()
			}
			mtx.Unlock()
		}
	}
}

func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
	sid := sh.ID()
	log := e.log.With(
		zap.Stringer("shard_id", sid),
		zap.Uint32("error count", errCount))

	e.mtx.RLock()
	defer e.mtx.RUnlock()

	if isMeta {
		err := sh.SetMode(mode.DegradedReadOnly)
		if err == nil {
			log.Info(logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
			return
		}
		log.Error(logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly,
			zap.Error(err))
	}

	err := sh.SetMode(mode.ReadOnly)
	if err != nil {
		log.Error(logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
		return
	}

	log.Info(logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold)
}

// reportShardErrorBackground increases shard error counter and logs an error.
// It is intended to be used from background workers and
// doesn't change shard mode because of possible deadlocks.
func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
	e.mtx.RLock()
	sh, ok := e.shards[id]
	e.mtx.RUnlock()

	if !ok {
		return
	}

	if isLogical(err) {
		e.log.Warn(msg,
			zap.Stringer("shard_id", sh.ID()),
			zap.String("error", err.Error()))
		return
	}

	errCount := sh.errorCount.Add(1)
	sh.Shard.IncErrorCounter()
	e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err)
}

// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
// If it does, shard is set to read-only mode.
func (e *StorageEngine) reportShardError(
	sh hashedShard,
	msg string,
	err error,
	fields ...zap.Field,
) {
	if isLogical(err) {
		e.log.Warn(msg,
			zap.Stringer("shard_id", sh.ID()),
			zap.String("error", err.Error()))
		return
	}

	errCount := sh.errorCount.Add(1)
	sh.Shard.IncErrorCounter()
	e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...)
}

func (e *StorageEngine) reportShardErrorWithFlags(
	sh *shard.Shard,
	errCount uint32,
	block bool,
	msg string,
	err error,
	fields ...zap.Field,
) {
	sid := sh.ID()
	e.log.Warn(msg, append([]zap.Field{
		zap.Stringer("shard_id", sid),
		zap.Uint32("error count", errCount),
		zap.String("error", err.Error()),
	}, fields...)...)

	if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
		return
	}

	isMeta := errors.As(err, new(metaerr.Error))
	if block {
		e.moveToDegraded(sh, errCount, isMeta)
	} else {
		req := setModeRequest{
			errorCount: errCount,
			isMeta:     isMeta,
			sh:         sh,
		}

		select {
		case e.setModeCh <- req:
		default:
			// For background workers we can have a lot of such errors,
			// thus logging is done with DEBUG level.
			e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
				zap.Stringer("shard_id", sid),
				zap.Uint32("error_count", errCount))
		}
	}
}

func isLogical(err error) bool {
	return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}

// Option represents StorageEngine's constructor option.
type Option func(*cfg)

type cfg struct {
	log *logger.Logger

	errorsThreshold uint32

	metrics MetricRegister

	shardPoolSize uint32

	lowMem bool
}

func defaultCfg() *cfg {
	return &cfg{
		log: &logger.Logger{Logger: zap.L()},

		shardPoolSize: 20,
	}
}

// New creates, initializes and returns new StorageEngine instance.
func New(opts ...Option) *StorageEngine {
	c := defaultCfg()

	for i := range opts {
		opts[i](c)
	}

	return &StorageEngine{
		cfg:             c,
		shards:          make(map[string]hashedShard),
		shardPools:      make(map[string]util.WorkerPool),
		closeCh:         make(chan struct{}),
		setModeCh:       make(chan setModeRequest),
		evacuateLimiter: &evacuationLimiter{},
	}
}

// WithLogger returns option to set StorageEngine's logger.
func WithLogger(l *logger.Logger) Option {
	return func(c *cfg) {
		c.log = l
	}
}

func WithMetrics(v MetricRegister) Option {
	return func(c *cfg) {
		c.metrics = v
	}
}

// WithShardPoolSize returns option to specify size of worker pool for each shard.
func WithShardPoolSize(sz uint32) Option {
	return func(c *cfg) {
		c.shardPoolSize = sz
	}
}

// WithErrorThreshold returns an option to specify size amount of errors after which
// shard is moved to read-only mode.
func WithErrorThreshold(sz uint32) Option {
	return func(c *cfg) {
		c.errorsThreshold = sz
	}
}

// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
func WithLowMemoryConsumption(lowMemCons bool) Option {
	return func(c *cfg) {
		c.lowMem = lowMemCons
	}
}