package engine

import (
	"errors"
	"fmt"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"git.frostfs.info/TrueCloudLab/hrw"
	"go.uber.org/zap"
)

// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
	shardID      []*shard.ID
	handler      func(oid.Address, *objectSDK.Object) error
	ignoreErrors bool
}

// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
	count int
}

// WithShardIDList sets shard ID.
func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) {
	p.shardID = id
}

// WithIgnoreErrors sets flag to ignore errors.
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
	p.ignoreErrors = ignore
}

// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) {
	p.handler = f
}

// Count returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
	return p.count
}

const defaultEvacuateBatchSize = 100

type pooledShard struct {
	hashedShard
	pool util.WorkerPool
}

var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")

// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
	shardIDs := make([]string, len(prm.shardID))
	for i := range prm.shardID {
		shardIDs[i] = prm.shardID[i].String()
	}

	shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
	if err != nil {
		return EvacuateShardRes{}, err
	}

	shardsToEvacuate := make(map[string]*shard.Shard)
	for i := range shardIDs {
		for j := range shards {
			if shards[j].ID().String() == shardIDs[i] {
				shardsToEvacuate[shardIDs[i]] = shards[j].Shard
			}
		}
	}

	e.log.Info("started shards evacuation", zap.Strings("shard_ids", shardIDs))

	var res EvacuateShardRes

	for _, shardID := range shardIDs {
		if err = e.evacuateShard(shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
			return res, err
		}
	}

	e.log.Info("finished shards evacuation", zap.Strings("shard_ids", shardIDs))
	return res, nil
}

func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
	var listPrm shard.ListWithCursorPrm
	listPrm.WithCount(defaultEvacuateBatchSize)

	sh := shardsToEvacuate[shardID]

	var c *meta.Cursor
	for {
		listPrm.WithCursor(c)

		// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
		//  because ListWithCursor works only with the metabase.
		listRes, err := sh.ListWithCursor(listPrm)
		if err != nil {
			if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
				break
			}
			return err
		}

		if err = e.evacuateObjects(sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
			return err
		}

		c = listRes.Cursor()
	}
	return nil
}

func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) {
	e.mtx.RLock()
	defer e.mtx.RUnlock()

	for i := range shardIDs {
		sh, ok := e.shards[shardIDs[i]]
		if !ok {
			return nil, nil, errShardNotFound
		}

		if !sh.GetMode().ReadOnly() {
			return nil, nil, shard.ErrMustBeReadOnly
		}
	}

	if len(e.shards)-len(shardIDs) < 1 && !handlerDefined {
		return nil, nil, errMustHaveTwoShards
	}

	// We must have all shards, to have correct information about their
	// indexes in a sorted slice and set appropriate marks in the metabase.
	// Evacuated shard is skipped during put.
	shards := make([]pooledShard, 0, len(e.shards))
	for id := range e.shards {
		shards = append(shards, pooledShard{
			hashedShard: hashedShard(e.shards[id]),
			pool:        e.shardPools[id],
		})
	}

	weights := make([]float64, 0, len(shards))
	for i := range shards {
		weights = append(weights, e.shardWeight(shards[i].Shard))
	}

	return shards, weights, nil
}

func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
	for i := range toEvacuate {
		addr := toEvacuate[i].Address

		var getPrm shard.GetPrm
		getPrm.SetAddress(addr)

		getRes, err := sh.Get(getPrm)
		if err != nil {
			if prm.ignoreErrors {
				continue
			}
			return err
		}

		if e.tryEvacuateObject(addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) {
			continue
		}

		if prm.handler == nil {
			// Do not check ignoreErrors flag here because
			// ignoring errors on put make this command kinda useless.
			return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
		}

		err = prm.handler(addr, getRes.Object())
		if err != nil {
			return err
		}
		res.count++
	}
	return nil
}

func (e *StorageEngine) tryEvacuateObject(addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
	shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool {
	hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
	for j := range shards {
		if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
			continue
		}
		putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, object)
		if putDone || exists {
			if putDone {
				e.log.Debug("object is moved to another shard",
					zap.Stringer("from", sh.ID()),
					zap.Stringer("to", shards[j].ID()),
					zap.Stringer("addr", addr))
				res.count++
			}
			return true
		}
	}

	return false
}