package engine

import (
	"context"
	"errors"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"git.frostfs.info/TrueCloudLab/hrw"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
)

// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress.
// We need it because `Rebalance` removes objects and executing it concurrently
// on 2 shards can lead to data loss. In future this restriction could be relaxed.
var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress")

const defaultRemoveDuplicatesConcurrency = 256

type RemoveDuplicatesPrm struct {
	Concurrency int
}

// RemoveDuplicates iterates over all objects and removes duplicate object copies
// from shards which are worse as defined by HRW sort.
// Safety:
//  1. Concurrent execution is prohibited, thus 1 object copy should always be left.
//  2. If we delete an object from another thread, this is not a problem. Currently,
//     we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects:
//     policer and rebalance. For rebalance see (1).
//     If policer removes something, we do not care if both copies are removed or one of them is left,
//     as the remaining copy will be removed during the next policer iteration.
func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error {
	if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) {
		return errRemoveDuplicatesInProgress
	}
	defer e.removeDuplicatesInProgress.Store(false)

	if prm.Concurrency <= 0 {
		prm.Concurrency = defaultRemoveDuplicatesConcurrency
	}

	e.log.Info(logs.EngineStartingRemovalOfLocallyredundantCopies,
		zap.Int("concurrency", prm.Concurrency))

	// The mutext must be taken for the whole duration to avoid target shard being removed
	// concurrently: this can lead to data loss.
	e.mtx.RLock()
	defer e.mtx.RUnlock()

	// Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously.
	// This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0.
	// However we could change weights in future and easily forget this function.
	for _, sh := range e.shards {
		e.log.Debug(logs.EngineStartedDuplicatesRemovalRoutine, zap.String("shard_id", sh.ID().String()))
		ch := make(chan oid.Address)

		errG, ctx := errgroup.WithContext(ctx)
		errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread

		errG.Go(func() error {
			defer close(ch)

			var cursor *meta.Cursor
			for {
				var listPrm shard.ListWithCursorPrm
				listPrm.WithCount(uint32(prm.Concurrency))
				listPrm.WithCursor(cursor)
				res, err := sh.ListWithCursor(ctx, listPrm)
				if err != nil {
					if errors.Is(err, meta.ErrEndOfListing) {
						return nil
					}
					return err
				}
				for _, addr := range res.AddressList() {
					select {
					case <-ctx.Done():
						return ctx.Err()
					case ch <- addr.Address:
					}
				}
				cursor = res.Cursor()
			}
		})

		for i := 0; i < prm.Concurrency; i++ {
			errG.Go(func() error {
				return e.removeObjects(ctx, ch)
			})
		}
		if err := errG.Wait(); err != nil {
			e.log.Error(logs.EngineFinishedRemovalOfLocallyredundantCopies, zap.Error(err))
			return err
		}
	}

	e.log.Info(logs.EngineFinishedRemovalOfLocallyredundantCopies)
	return nil
}

// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID.
func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error {
	shards := make([]hashedShard, 0, len(e.shards))
	for _, sh := range e.shards {
		shards = append(shards, sh)
	}

	for addr := range ch {
		h := hrw.StringHash(addr.EncodeToString())
		shards := sortShardsByWeight(shards, h)
		found := false
		for i := range shards {
			var existsPrm shard.ExistsPrm
			existsPrm.SetAddress(addr)

			res, err := shards[i].Exists(ctx, existsPrm)
			if err != nil {
				return err
			} else if !res.Exists() {
				continue
			} else if !found {
				found = true
				continue
			}

			var deletePrm shard.DeletePrm
			deletePrm.SetAddresses(addr)
			_, err = shards[i].Delete(ctx, deletePrm)
			if err != nil {
				return err
			}
		}
	}
	return nil
}