package tree

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"sync"
	"sync/atomic"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
	cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
	"github.com/panjf2000/ants/v2"
	"go.uber.org/zap"
)

// Service represents tree-service capable of working with multiple
// instances of CRDT trees.
type Service struct {
	cfg

	cache            clientCache
	replicateCh      chan movePair
	replicateLocalCh chan applyOp
	replicationTasks chan replicationTask
	closeCh          chan struct{}
	containerCache   containerCache

	syncChan chan struct{}
	syncPool *ants.Pool

	initialSyncDone atomic.Bool

	// cnrMap contains existing (used) container IDs.
	cnrMap map[cidSDK.ID]struct{}
	// cnrMapMtx protects cnrMap
	cnrMapMtx sync.Mutex
}

var _ TreeServiceServer = (*Service)(nil)

// New creates new tree service instance.
func New(opts ...Option) *Service {
	var s Service
	s.containerCacheSize = defaultContainerCacheSize
	s.replicatorChannelCapacity = defaultReplicatorCapacity
	s.replicatorWorkerCount = defaultReplicatorWorkerCount
	s.replicatorTimeout = defaultReplicatorSendTimeout
	s.metrics = defaultMetricsRegister{}

	for i := range opts {
		opts[i](&s.cfg)
	}

	if s.log == nil {
		s.log = &logger.Logger{Logger: zap.NewNop()}
	}

	s.cache.init()
	s.closeCh = make(chan struct{})
	s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
	s.replicateLocalCh = make(chan applyOp)
	s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
	s.containerCache.init(s.containerCacheSize)
	s.cnrMap = make(map[cidSDK.ID]struct{})
	s.syncChan = make(chan struct{})
	s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)

	return &s
}

// Start starts the service.
func (s *Service) Start(ctx context.Context) {
	go s.replicateLoop(ctx)
	go s.syncLoop(ctx)

	select {
	case <-s.closeCh:
	case <-ctx.Done():
	default:
		// initial sync
		s.syncChan <- struct{}{}
	}
}

// Shutdown shutdowns the service.
func (s *Service) Shutdown() {
	close(s.closeCh)
	s.syncPool.Release()
}

func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(b.GetContainerId()); err != nil {
		return nil, err
	}

	err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
	if err != nil {
		return nil, err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		var resp *AddResponse
		var outErr error
		err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
			resp, outErr = c.Add(ctx, req)
			return true
		})
		if err != nil {
			return nil, err
		}
		return resp, outErr
	}

	d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
	log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
		Parent: b.GetParentId(),
		Child:  pilorama.RootID,
		Meta:   pilorama.Meta{Items: protoToMeta(b.GetMeta())},
	})
	if err != nil {
		return nil, err
	}

	s.pushToQueue(cid, b.GetTreeId(), log)
	return &AddResponse{
		Body: &AddResponse_Body{
			NodeId: log.Child,
		},
	}, nil
}

func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(b.GetContainerId()); err != nil {
		return nil, err
	}

	err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
	if err != nil {
		return nil, err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		var resp *AddByPathResponse
		var outErr error
		err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
			resp, outErr = c.AddByPath(ctx, req)
			return true
		})
		if err != nil {
			return nil, err
		}
		return resp, outErr
	}

	meta := protoToMeta(b.GetMeta())

	attr := b.GetPathAttribute()
	if len(attr) == 0 {
		attr = pilorama.AttributeFilename
	}

	d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
	logs, err := s.forest.TreeAddByPath(ctx, d, b.GetTreeId(), attr, b.GetPath(), meta)
	if err != nil {
		return nil, err
	}

	for i := range logs {
		s.pushToQueue(cid, b.GetTreeId(), &logs[i])
	}

	nodes := make([]uint64, len(logs))
	nodes[0] = logs[len(logs)-1].Child
	for i, l := range logs[:len(logs)-1] {
		nodes[i+1] = l.Child
	}

	return &AddByPathResponse{
		Body: &AddByPathResponse_Body{
			Nodes: nodes,
		},
	}, nil
}

func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(b.GetContainerId()); err != nil {
		return nil, err
	}

	err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
	if err != nil {
		return nil, err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		var resp *RemoveResponse
		var outErr error
		err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
			resp, outErr = c.Remove(ctx, req)
			return true
		})
		if err != nil {
			return nil, err
		}
		return resp, outErr
	}

	if b.GetNodeId() == pilorama.RootID {
		return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
	}

	d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
	log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
		Parent: pilorama.TrashID,
		Child:  b.GetNodeId(),
	})
	if err != nil {
		return nil, err
	}

	s.pushToQueue(cid, b.GetTreeId(), log)
	return new(RemoveResponse), nil
}

// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(b.GetContainerId()); err != nil {
		return nil, err
	}

	err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
	if err != nil {
		return nil, err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		var resp *MoveResponse
		var outErr error
		err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
			resp, outErr = c.Move(ctx, req)
			return true
		})
		if err != nil {
			return nil, err
		}
		return resp, outErr
	}

	if b.GetNodeId() == pilorama.RootID {
		return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
	}

	d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
	log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
		Parent: b.GetParentId(),
		Child:  b.GetNodeId(),
		Meta:   pilorama.Meta{Items: protoToMeta(b.GetMeta())},
	})
	if err != nil {
		return nil, err
	}

	s.pushToQueue(cid, b.GetTreeId(), log)
	return new(MoveResponse), nil
}

func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(b.GetContainerId()); err != nil {
		return nil, err
	}

	err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet)
	if err != nil {
		return nil, err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		var resp *GetNodeByPathResponse
		var outErr error
		err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
			resp, outErr = c.GetNodeByPath(ctx, req)
			return true
		})
		if err != nil {
			return nil, err
		}
		return resp, outErr
	}

	attr := b.GetPathAttribute()
	if len(attr) == 0 {
		attr = pilorama.AttributeFilename
	}

	nodes, err := s.forest.TreeGetByPath(ctx, cid, b.GetTreeId(), attr, b.GetPath(), b.GetLatestOnly())
	if err != nil {
		return nil, err
	}

	info := make([]*GetNodeByPathResponse_Info, 0, len(nodes))
	for _, node := range nodes {
		m, parent, err := s.forest.TreeGetMeta(ctx, cid, b.GetTreeId(), node)
		if err != nil {
			return nil, err
		}

		var x GetNodeByPathResponse_Info
		x.ParentId = parent
		x.NodeId = node
		x.Timestamp = m.Time
		if b.AllAttributes {
			x.Meta = metaToProto(m.Items)
		} else {
			for _, kv := range m.Items {
				for _, attr := range b.GetAttributes() {
					if kv.Key == attr {
						x.Meta = append(x.Meta, &KeyValue{
							Key:   kv.Key,
							Value: kv.Value,
						})
						break
					}
				}
			}
		}
		info = append(info, &x)
	}

	return &GetNodeByPathResponse{
		Body: &GetNodeByPathResponse_Body{
			Nodes: info,
		},
	}, nil
}

func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
	if !s.initialSyncDone.Load() {
		return ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(b.GetContainerId()); err != nil {
		return err
	}

	err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet)
	if err != nil {
		return err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return err
	}
	if pos < 0 {
		var cli TreeService_GetSubTreeClient
		var outErr error
		err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
			cli, outErr = c.GetSubTree(srv.Context(), req)
			return true
		})
		if err != nil {
			return err
		} else if outErr != nil {
			return outErr
		}
		for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
			if err := srv.Send(resp); err != nil {
				return err
			}
		}
		return nil
	}

	return getSubTree(srv.Context(), srv, cid, b, s.forest)
}

func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
	// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
	// recursive implementation is not suitable here, so we maintain explicit stack.
	stack := [][]uint64{{b.GetRootId()}}

	for {
		if len(stack) == 0 {
			break
		} else if len(stack[len(stack)-1]) == 0 {
			stack = stack[:len(stack)-1]
			continue
		}

		nodeID := stack[len(stack)-1][0]
		stack[len(stack)-1] = stack[len(stack)-1][1:]

		m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), nodeID)
		if err != nil {
			return err
		}
		err = srv.Send(&GetSubTreeResponse{
			Body: &GetSubTreeResponse_Body{
				NodeId:    nodeID,
				ParentId:  p,
				Timestamp: m.Time,
				Meta:      metaToProto(m.Items),
			},
		})
		if err != nil {
			return err
		}

		if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
			children, err := forest.TreeGetChildren(ctx, cid, b.GetTreeId(), nodeID)
			if err != nil {
				return err
			}
			if len(children) != 0 {
				stack = append(stack, children)
			}
		}
	}
	return nil
}

// Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) {
	err := verifyMessage(req)
	if err != nil {
		return nil, err
	}

	var cid cidSDK.ID
	if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
		return nil, err
	}

	key := req.GetSignature().GetKey()

	_, pos, _, err := s.getContainerInfo(cid, key)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		return nil, errors.New("`Apply` request must be signed by a container node")
	}

	op := req.GetBody().GetOperation()

	var meta pilorama.Meta
	if err := meta.FromBytes(op.GetMeta()); err != nil {
		return nil, fmt.Errorf("can't parse meta-information: %w", err)
	}

	select {
	case s.replicateLocalCh <- applyOp{
		treeID: req.GetBody().GetTreeId(),
		cid:    cid,
		Move: pilorama.Move{
			Parent: op.GetParentId(),
			Child:  op.GetChildId(),
			Meta:   meta,
		},
	}:
	default:
	}
	return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
}

func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
	if !s.initialSyncDone.Load() {
		return ErrAlreadySyncing
	}

	b := req.GetBody()

	var cid cidSDK.ID
	if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
		return err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return err
	}
	if pos < 0 {
		var cli TreeService_GetOpLogClient
		var outErr error
		err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
			cli, outErr = c.GetOpLog(srv.Context(), req)
			return true
		})
		if err != nil {
			return err
		} else if outErr != nil {
			return outErr
		}
		for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
			if err := srv.Send(resp); err != nil {
				return err
			}
		}
		return nil
	}

	h := b.GetHeight()
	lastHeight, err := s.forest.TreeHeight(srv.Context(), cid, b.GetTreeId())
	if err != nil {
		return err
	}
	for {
		lm, err := s.forest.TreeGetOpLog(srv.Context(), cid, b.GetTreeId(), h)
		if err != nil || lm.Time == 0 || lastHeight < lm.Time {
			return err
		}

		err = srv.Send(&GetOpLogResponse{
			Body: &GetOpLogResponse_Body{
				Operation: &LogMove{
					ParentId: lm.Parent,
					Meta:     lm.Meta.Bytes(),
					ChildId:  lm.Child,
				},
			},
		})
		if err != nil {
			return err
		}

		h = lm.Time + 1
	}
}

func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	var cid cidSDK.ID

	err := cid.Decode(req.GetBody().GetContainerId())
	if err != nil {
		return nil, err
	}

	// just verify the signature, not ACL checks
	// since tree ID list is not protected like
	// the containers list
	err = verifyMessage(req)
	if err != nil {
		return nil, err
	}

	ns, pos, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, err
	}
	if pos < 0 {
		var resp *TreeListResponse
		var outErr error
		err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
			resp, outErr = c.TreeList(ctx, req)
			return outErr == nil
		})
		if err != nil {
			return nil, err
		}
		return resp, outErr
	}

	ids, err := s.forest.TreeList(ctx, cid)
	if err != nil {
		return nil, err
	}

	return &TreeListResponse{
		Body: &TreeListResponse_Body{
			Ids: ids,
		},
	}, nil
}

func protoToMeta(arr []*KeyValue) []pilorama.KeyValue {
	meta := make([]pilorama.KeyValue, len(arr))
	for i, kv := range arr {
		if kv != nil {
			meta[i].Key = kv.Key
			meta[i].Value = kv.Value
		}
	}
	return meta
}

func metaToProto(arr []pilorama.KeyValue) []*KeyValue {
	meta := make([]*KeyValue, len(arr))
	for i, kv := range arr {
		meta[i] = &KeyValue{
			Key:   kv.Key,
			Value: kv.Value,
		}
	}
	return meta
}

// getContainerInfo returns the list of container nodes, position in the container for the node
// with pub key and total amount of nodes in all replicas.
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
	cntNodes, _, err := s.getContainerNodes(cid)
	if err != nil {
		return nil, 0, 0, err
	}

	for i, node := range cntNodes {
		if bytes.Equal(node.PublicKey(), pub) {
			return cntNodes, i, len(cntNodes), nil
		}
	}
	return cntNodes, -1, len(cntNodes), nil
}

func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
	if !s.initialSyncDone.Load() {
		return nil, ErrAlreadySyncing
	}

	return new(HealthcheckResponse), nil
}