package engine

import (

	meta ""
	oid ""

// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress.
// We need it because `Rebalance` removes objects and executing it concurrently
// on 2 shards can lead to data loss. In future this restriction could be relaxed.
var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress")

const defaultRemoveDuplicatesConcurrency = 256

type RemoveDuplicatesPrm struct {
	Concurrency int

// RemoveDuplicates iterates over all objects and removes duplicate object copies
// from shards which are worse as defined by HRW sort.
// Safety:
//  1. Concurrent execution is prohibited, thus 1 object copy should always be left.
//  2. If we delete an object from another thread, this is not a problem. Currently,
//     we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects:
//     policer and rebalance. For rebalance see (1).
//     If policer removes something, we do not care if both copies are removed or one of them is left,
//     as the remaining copy will be removed during the next policer iteration.
func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error {
	if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) {
		return errRemoveDuplicatesInProgress
	defer e.removeDuplicatesInProgress.Store(false)

	if prm.Concurrency <= 0 {
		prm.Concurrency = defaultRemoveDuplicatesConcurrency

		zap.Int("concurrency", prm.Concurrency))

	// The mutext must be taken for the whole duration to avoid target shard being removed
	// concurrently: this can lead to data loss.
	defer e.mtx.RUnlock()

	// Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously.
	// This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0.
	// However we could change weights in future and easily forget this function.
	for _, sh := range e.shards {
		e.log.Debug(logs.EngineStartedDuplicatesRemovalRoutine, zap.Stringer("shard_id", sh.ID()))
		ch := make(chan oid.Address)

		errG, ctx := errgroup.WithContext(ctx)
		errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread

		errG.Go(func() error {
			defer close(ch)

			var cursor *meta.Cursor
			for {
				var listPrm shard.ListWithCursorPrm
				res, err := sh.ListWithCursor(ctx, listPrm)
				if err != nil {
					if errors.Is(err, meta.ErrEndOfListing) {
						return nil
					return err
				for _, addr := range res.AddressList() {
					select {
					case <-ctx.Done():
						return ctx.Err()
					case ch <- addr.Address:
				cursor = res.Cursor()

		for i := 0; i < prm.Concurrency; i++ {
			errG.Go(func() error {
				return e.removeObjects(ctx, ch)
		if err := errG.Wait(); err != nil {
			e.log.Error(logs.EngineFinishedRemovalOfLocallyredundantCopies, zap.Error(err))
			return err

	return nil

// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID.
func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error {
	shards := make([]hashedShard, 0, len(e.shards))
	for _, sh := range e.shards {
		shards = append(shards, sh)

	for addr := range ch {
		h := hrw.StringHash(addr.EncodeToString())
		hrw.SortHasherSliceByValue(shards, h)
		found := false
		for i := range shards {
			var existsPrm shard.ExistsPrm

			res, err := shards[i].Exists(ctx, existsPrm)
			if err != nil {
				return err
			} else if !res.Exists() {
			} else if !found {
				found = true

			var deletePrm shard.DeletePrm
			_, err = shards[i].Delete(ctx, deletePrm)
			if err != nil {
				return err
	return nil