package getsvc

import (
	"context"
	"crypto/ecdsa"
	"errors"
	"io"
	"sync"

	objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
	rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
	internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

type getRangeRequestForwarder struct {
	OnceResign     sync.Once
	GlobalProgress int
	Key            *ecdsa.PrivateKey
	Request        *objectV2.GetRangeRequest
	Stream         *streamObjectRangeWriter
}

func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*objectSDK.Object, error) {
	ctx, span := tracing.StartSpanFromContext(ctx, "getRangeRequestForwarder.forwardRequestToNode",
		trace.WithAttributes(attribute.String("address", addr.String())),
	)
	defer span.End()

	var err error

	// once compose and resign forwarding request
	f.OnceResign.Do(func() {
		// compose meta header of the local server
		metaHdr := new(session.RequestMetaHeader)
		metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
		// TODO: #1165 think how to set the other fields
		metaHdr.SetOrigin(f.Request.GetMetaHeader())
		writeCurrentVersion(metaHdr)

		f.Request.SetMetaHeader(metaHdr)

		err = signature.SignServiceMessage(f.Key, f.Request)
	})

	if err != nil {
		return nil, err
	}

	rangeStream, err := f.openStream(ctx, addr, c)
	if err != nil {
		return nil, err
	}

	return nil, f.readStream(ctx, rangeStream, c, pubkey)
}

func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) {
	// open stream
	var rangeStream *rpc.ObjectRangeResponseReader
	err := c.RawForAddress(ctx, addr, func(cli *rpcclient.Client) error {
		var e error
		rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx))
		return e
	})
	if err != nil {
		return nil, errCouldNotCreateGetRangeStream(err)
	}
	return rangeStream, nil
}

func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error {
	resp := new(objectV2.GetRangeResponse)
	var localProgress int

	for {
		// receive message from server stream
		err := rangeStream.Read(resp)
		if err != nil {
			if errors.Is(err, io.EOF) {
				break
			}
			internalclient.ReportError(c, err)
			return errReadingResponseFailed(err)
		}

		if err := verifyResponse(resp, pubkey); err != nil {
			return err
		}

		switch v := resp.GetBody().GetRangePart().(type) {
		case nil:
			return errUnexpectedRangePart(v)
		case *objectV2.GetRangePartChunk:
			origChunk := v.GetChunk()

			chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
			if len(chunk) == 0 {
				localProgress += len(origChunk)
				continue
			}

			if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
				return errCouldNotWriteObjChunk("GetRange", err)
			}

			localProgress += len(origChunk)
			f.GlobalProgress += len(chunk)
		case *objectV2.SplitInfo:
			si := objectSDK.NewSplitInfoFromV2(v)
			return objectSDK.NewSplitInfoError(si)
		case *objectV2.ECInfo:
			ei := objectSDK.NewECInfoFromV2(v)
			return objectSDK.NewECInfoError(ei)
		}
	}
	return nil
}