package tree

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"errors"
	"fmt"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
	tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
)

type movePair struct {
	cid    cidSDK.ID
	treeID string
	op     *pilorama.Move
}

type replicationTask struct {
	n   netmapSDK.NodeInfo
	req *ApplyRequest
}

type applyOp struct {
	treeID string
	cid    cidSDK.ID
	pilorama.Move
}

const (
	defaultReplicatorCapacity    = 64
	defaultReplicatorWorkerCount = 64
	defaultReplicatorSendTimeout = time.Second * 5
)

func (s *Service) localReplicationWorker(ctx context.Context) {
	for {
		select {
		case <-s.closeCh:
			return
		case op := <-s.replicateLocalCh:
			ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationOperation",
				trace.WithAttributes(
					attribute.String("tree_id", op.treeID),
					attribute.String("container_id", op.cid.EncodeToString()),
				),
			)

			err := s.forest.TreeApply(ctx, op.cid, op.treeID, &op.Move, false)
			if err != nil {
				s.log.Error(logs.TreeFailedToApplyReplicatedOperation,
					zap.String("err", err.Error()))
			}
			span.End()
		}
	}
}

func (s *Service) replicationWorker(ctx context.Context) {
	for {
		select {
		case <-s.closeCh:
			return
		case task := <-s.replicationTasks:
			_ = s.ReplicateTreeOp(ctx, task.n, task.req)
		}
	}
}

func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error {
	ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
		trace.WithAttributes(
			attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
		),
	)
	defer span.End()

	start := time.Now()

	var lastErr error
	var lastAddr string

	n.IterateNetworkEndpoints(func(addr string) bool {
		ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
			trace.WithAttributes(
				attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
				attribute.String("address", addr),
			),
		)
		defer span.End()

		lastAddr = addr

		c, err := s.cache.get(ctx, addr)
		if err != nil {
			lastErr = fmt.Errorf("can't create client: %w", err)
			return false
		}

		ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
		_, lastErr = c.Apply(ctx, req)
		cancel()

		return lastErr == nil
	})

	if lastErr != nil {
		if errors.Is(lastErr, errRecentlyFailed) {
			s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
				zap.String("last_error", lastErr.Error()),
				zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
		} else {
			s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
				zap.String("last_error", lastErr.Error()),
				zap.String("address", lastAddr),
				zap.String("key", hex.EncodeToString(n.PublicKey())),
				zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
		}
		s.metrics.AddReplicateTaskDuration(time.Since(start), false)
		return lastErr
	}
	s.metrics.AddReplicateTaskDuration(time.Since(start), true)
	return nil
}

func (s *Service) replicateLoop(ctx context.Context) {
	for i := 0; i < s.replicatorWorkerCount; i++ {
		go s.replicationWorker(ctx)
		go s.localReplicationWorker(ctx)
	}
	defer func() {
		for len(s.replicationTasks) != 0 {
			<-s.replicationTasks
		}
	}()

	for {
		select {
		case <-s.closeCh:
			return
		case <-ctx.Done():
			return
		case op := <-s.replicateCh:
			start := time.Now()
			err := s.replicate(op)
			if err != nil {
				s.log.Error(logs.TreeErrorDuringReplication,
					zap.String("err", err.Error()),
					zap.Stringer("cid", op.cid),
					zap.String("treeID", op.treeID))
			}
			s.metrics.AddReplicateWaitDuration(time.Since(start), err == nil)
		}
	}
}

func (s *Service) replicate(op movePair) error {
	req := newApplyRequest(&op)
	err := SignMessage(req, s.key)
	if err != nil {
		return fmt.Errorf("can't sign data: %w", err)
	}

	nodes, localIndex, err := s.getContainerNodes(op.cid)
	if err != nil {
		return fmt.Errorf("can't get container nodes: %w", err)
	}

	for i := range nodes {
		if i != localIndex {
			s.replicationTasks <- replicationTask{nodes[i], req}
		}
	}
	return nil
}

func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
	select {
	case s.replicateCh <- movePair{
		cid:    cid,
		treeID: treeID,
		op:     op,
	}:
	default:
	}
}

func newApplyRequest(op *movePair) *ApplyRequest {
	rawCID := make([]byte, sha256.Size)
	op.cid.Encode(rawCID)

	return &ApplyRequest{
		Body: &ApplyRequest_Body{
			ContainerId: rawCID,
			TreeId:      op.treeID,
			Operation: &LogMove{
				ParentId: op.op.Parent,
				Meta:     op.op.Meta.Bytes(),
				ChildId:  op.op.Child,
			},
		},
	}
}