package putsvc

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	"go.uber.org/zap"
)

type nodeIterator struct {
	traversal
	cfg *cfg
}

func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator {
	return &nodeIterator{
		traversal: traversal{
			opts:     opts,
			mExclude: make(map[string]*bool),
		},
		cfg: c,
	}
}

func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error {
	traverser, err := placement.NewTraverser(n.traversal.opts...)
	if err != nil {
		return fmt.Errorf("could not create object placement traverser: %w", err)
	}

	resErr := &atomic.Value{}

	// Must iterate over all replicas, regardless of whether there are identical nodes there.
	// At the same time need to exclude identical nodes from processing.
	for {
		addrs := traverser.Next()
		if len(addrs) == 0 {
			break
		}

		if n.forEachAddress(ctx, traverser, addrs, f, resErr) {
			break
		}
	}

	if !traverser.Success() {
		var err errIncompletePut
		err.singleErr, _ = resErr.Load().(error)
		return err
	}

	// perform additional container broadcast if needed
	if n.traversal.submitPrimaryPlacementFinish() {
		err := n.forEachNode(ctx, f)
		if err != nil {
			n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
			// we don't fail primary operation because of broadcast failure
		}
	}

	return nil
}

func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool {
	var wg sync.WaitGroup

	for _, addr := range addrs {
		addr := addr
		if ok := n.mExclude[string(addr.PublicKey())]; ok != nil {
			if *ok {
				traverser.SubmitSuccess()
			}
			// This can happen only during additional container broadcast.
			continue
		}

		workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey())

		item := new(bool)
		wg.Add(1)
		if err := workerPool.Submit(func() {
			defer wg.Done()

			err := f(ctx, nodeDesc{local: isLocal, info: addr})
			if err != nil {
				resErr.Store(err)
				svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err)
				return
			}

			traverser.SubmitSuccess()
			*item = true
		}); err != nil {
			wg.Done()
			svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err)
			return true
		}

		// Mark the container node as processed in order to exclude it
		// in subsequent container broadcast. Note that we don't
		// process this node during broadcast if primary placement
		// on it failed.
		n.traversal.submitProcessed(addr, item)
	}

	wg.Wait()

	return false
}

func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
	return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
}