package patchsvc

import (
	"context"
	"crypto/ecdsa"
	"errors"
	"fmt"
	"io"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
	objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
	getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
	refsV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
)

// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct {
	*objectwriter.Config

	// Patcher must be initialized at first Streamer.Send call.
	patcher patcher.PatchApplier

	nonFirstSend bool

	getSvc *getsvc.Service

	localNodeKey *ecdsa.PrivateKey
}

type pipeChunkWriter struct {
	wr *io.PipeWriter
}

type headResponseWriter struct {
	body *objectV2.HeadResponseBody
}

func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
	w.body.SetHeaderPart(toFullObjectHeader(hdr))
	return nil
}

func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
	obj := hdr.ToV2()

	hs := new(objectV2.HeaderWithSignature)
	hs.SetHeader(obj.GetHeader())
	hs.SetSignature(obj.GetSignature())

	return hs
}

func isLinkObject(hdr *objectV2.HeaderWithSignature) bool {
	split := hdr.GetHeader().GetSplit()
	return len(split.GetChildren()) > 0 && split.GetParent() != nil
}

func isComplexObjectPart(hdr *objectV2.HeaderWithSignature) bool {
	return hdr.GetHeader().GetEC() != nil || hdr.GetHeader().GetSplit() != nil
}

func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
	hdrWithSig, addr, err := s.readHeader(ctx, req)
	if err != nil {
		return err
	}

	if hdrWithSig.GetHeader().GetObjectType() != objectV2.TypeRegular {
		return errors.New("non-regular object can't be patched")
	}
	if isLinkObject(hdrWithSig) {
		return errors.New("linking object can't be patched")
	}
	if isComplexObjectPart(hdrWithSig) {
		return errors.New("complex object parts can't be patched")
	}

	commonPrm, err := util.CommonPrmFromV2(req)
	if err != nil {
		return err
	}
	commonPrm.WithLocalOnly(false)

	rangeProvider := &rangeProvider{
		getSvc: s.getSvc,

		addr: addr,

		commonPrm: commonPrm,

		localNodeKey: s.localNodeKey,
	}

	hdr := hdrWithSig.GetHeader()
	oV2 := new(objectV2.Object)
	hV2 := new(objectV2.Header)
	oV2.SetHeader(hV2)
	oV2.GetHeader().SetContainerID(hdr.GetContainerID())
	oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
	oV2.GetHeader().SetAttributes(hdr.GetAttributes())

	ownerID, err := newOwnerID(req.GetVerificationHeader())
	if err != nil {
		return err
	}
	oV2.GetHeader().SetOwnerID(ownerID)

	target, err := target.New(ctx, objectwriter.Params{
		Config: s.Config,
		Common: commonPrm,
		Header: objectSDK.NewFromV2(oV2),
	})
	if err != nil {
		return fmt.Errorf("target creation: %w", err)
	}

	patcherPrm := patcher.Params{
		Header: objectSDK.NewFromV2(oV2),

		RangeProvider: rangeProvider,

		ObjectWriter: target,
	}

	s.patcher = patcher.New(patcherPrm)
	return nil
}

func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
	addrV2 := req.GetBody().GetAddress()
	if addrV2 == nil {
		err = errors.New("patch request has nil-address")
		return
	}

	if err = addr.ReadFromV2(*addrV2); err != nil {
		err = fmt.Errorf("read address error: %w", err)
		return
	}

	commonPrm, err := util.CommonPrmFromV2(req)
	if err != nil {
		return
	}
	commonPrm.WithLocalOnly(false)

	var p getsvc.HeadPrm
	p.SetSignerKey(s.localNodeKey)
	p.SetCommonParameters(commonPrm)

	resp := new(objectV2.HeadResponse)
	resp.SetBody(new(objectV2.HeadResponseBody))

	p.WithAddress(addr)
	p.SetHeaderWriter(&headResponseWriter{
		body: resp.GetBody(),
	})

	err = s.getSvc.Head(ctx, p)
	if err != nil {
		err = fmt.Errorf("get header error: %w", err)
		return
	}

	var ok bool
	hdrPart := resp.GetBody().GetHeaderPart()
	if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
		err = fmt.Errorf("unexpected header type: %T", hdrPart)
	}
	return
}

func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
	ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
	defer span.End()

	defer func() {
		s.nonFirstSend = true
	}()

	if !s.nonFirstSend {
		if err := s.init(ctx, req); err != nil {
			return fmt.Errorf("streamer init error: %w", err)
		}
	}

	patch := new(objectSDK.Patch)
	patch.FromV2(req.GetBody())

	if !s.nonFirstSend {
		err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
		if err != nil {
			return fmt.Errorf("patch attributes: %w", err)
		}
	}

	if patch.PayloadPatch != nil {
		err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
		if err != nil {
			return fmt.Errorf("patch payload: %w", err)
		}
	} else if s.nonFirstSend {
		return errors.New("invalid non-first patch: empty payload")
	}

	return nil
}

func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
	if s.patcher == nil {
		return nil, errors.New("uninitialized patch streamer")
	}
	patcherResp, err := s.patcher.Close(ctx)
	if err != nil {
		return nil, err
	}

	oidV2 := new(refsV2.ObjectID)

	if patcherResp.AccessIdentifiers.ParentID != nil {
		patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
	} else {
		patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
	}

	return &objectV2.PatchResponse{
		Body: &objectV2.PatchResponseBody{
			ObjectID: oidV2,
		},
	}, nil
}