package putsvc

import (
	"context"
	"crypto/ecdsa"
	"errors"
	"fmt"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
	pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
	containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)

type Streamer struct {
	*cfg

	privateKey *ecdsa.PrivateKey

	target transformer.ChunkedObjectWriter

	relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error

	maxPayloadSz uint64 // network config
}

var errNotInit = errors.New("stream not initialized")

var errInitRecall = errors.New("init recall")

func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
	// initialize destination target
	if err := p.initTarget(prm); err != nil {
		return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
	}

	if err := p.target.WriteHeader(ctx, prm.hdr); err != nil {
		return fmt.Errorf("(%T) could not write header to target: %w", p, err)
	}
	return nil
}

// MaxObjectSize returns maximum payload size for the streaming session.
//
// Must be called after the successful Init.
func (p *Streamer) MaxObjectSize() uint64 {
	return p.maxPayloadSz
}

func (p *Streamer) initTarget(prm *PutInitPrm) error {
	// prevent re-calling
	if p.target != nil {
		return errInitRecall
	}

	// prepare needed put parameters
	if err := p.preparePrm(prm); err != nil {
		return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
	}

	p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
	if p.maxPayloadSz == 0 {
		return fmt.Errorf("(%T) could not obtain max object size parameter", p)
	}

	if prm.hdr.Signature() != nil {
		return p.initUntrustedTarget(prm)
	}
	return p.initTrustedTarget(prm)
}

func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
	p.relay = prm.relay

	nodeKey, err := p.cfg.keyStorage.GetKey(nil)
	if err != nil {
		return err
	}
	p.privateKey = nodeKey

	// prepare untrusted-Put object target
	p.target = &validatingPreparedTarget{
		nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
		fmt:        p.fmtValidator,

		maxPayloadSz: p.maxPayloadSz,
	}

	return nil
}

func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
	sToken := prm.common.SessionToken()

	// prepare trusted-Put object target

	// get private token from local storage
	var sessionInfo *util.SessionInfo

	if sToken != nil {
		sessionInfo = &util.SessionInfo{
			ID:    sToken.ID(),
			Owner: sToken.Issuer(),
		}
	}

	key, err := p.keyStorage.GetKey(sessionInfo)
	if err != nil {
		return fmt.Errorf("(%T) could not receive session key: %w", p, err)
	}

	// In case session token is missing, the line above returns the default key.
	// If it isn't owner key, replication attempts will fail, thus this check.
	ownerObj := prm.hdr.OwnerID()
	if ownerObj.IsEmpty() {
		return errors.New("missing object owner")
	}

	if sToken == nil {
		var ownerSession user.ID
		user.IDFromKey(&ownerSession, key.PublicKey)

		if !ownerObj.Equals(ownerSession) {
			return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
		}
	} else {
		if !ownerObj.Equals(sessionInfo.Owner) {
			return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
		}
	}

	p.privateKey = key
	p.target = &validatingTarget{
		fmt: p.fmtValidator,
		nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
			Key:                    key,
			NextTargetInit:         func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
			NetworkState:           p.networkState,
			MaxSize:                p.maxPayloadSz,
			WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
			SessionToken:           sToken,
		}),
	}

	return nil
}

func (p *Streamer) preparePrm(prm *PutInitPrm) error {
	var err error

	// get latest network map
	nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
	if err != nil {
		return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
	}

	idCnr, ok := prm.hdr.ContainerID()
	if !ok {
		return errors.New("missing container ID")
	}

	// get container to store the object
	cnrInfo, err := p.cnrSrc.Get(idCnr)
	if err != nil {
		return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
	}

	prm.cnr = cnrInfo.Value

	// add common options
	prm.traverseOpts = append(prm.traverseOpts,
		// set processing container
		placement.ForContainer(prm.cnr),
	)

	if ech := prm.hdr.ECHeader(); ech != nil {
		prm.traverseOpts = append(prm.traverseOpts,
			// set identifier of the processing object
			placement.ForObject(ech.Parent()),
		)
	} else if id, ok := prm.hdr.ID(); ok {
		prm.traverseOpts = append(prm.traverseOpts,
			// set identifier of the processing object
			placement.ForObject(id),
		)
	}

	// create placement builder from network map
	builder := placement.NewNetworkMapBuilder(nm)

	if prm.common.LocalOnly() {
		// restrict success count to 1 stored copy (to local storage)
		prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))

		// use local-only placement builder
		builder = util.NewLocalPlacement(builder, p.netmapKeys)
	}

	// set placement builder
	prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))

	return nil
}

func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
	if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
		return p.newECWriter(prm)
	}
	return p.newDefaultObjectWriter(prm)
}

func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
	var relay func(context.Context, nodeDesc) error
	if p.relay != nil {
		relay = func(ctx context.Context, node nodeDesc) error {
			var info client.NodeInfo

			client.NodeInfoFromNetmapElement(&info, node.info)

			c, err := p.clientConstructor.Get(info)
			if err != nil {
				return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
			}

			return p.relay(ctx, info, c)
		}
	}

	return &distributedTarget{
		cfg:           p.cfg,
		placementOpts: prm.traverseOpts,
		nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
			if node.local {
				return localTarget{
					storage: p.localStore,
				}
			}

			rt := &remoteTarget{
				privateKey:        p.privateKey,
				commonPrm:         prm.common,
				clientConstructor: p.clientConstructor,
			}

			client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)

			return rt
		},
		relay: relay,
	}
}

func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
	return &objectWriterDispatcher{
		ecWriter: &ecWriter{
			cfg:           p.cfg,
			placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
			container:     prm.cnr,
			key:           p.privateKey,
			commonPrm:     prm.common,
			relay:         p.relay,
		},
		repWriter: p.newDefaultObjectWriter(prm),
	}
}

func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
	if p.target == nil {
		return errNotInit
	}

	if _, err := p.target.Write(ctx, prm.chunk); err != nil {
		return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
	}

	return nil
}

func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
	if p.target == nil {
		return nil, errNotInit
	}

	ids, err := p.target.Close(ctx)
	if err != nil {
		return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
	}

	id := ids.ParentID
	if id != nil {
		return &PutResponse{
			id: *id,
		}, nil
	}

	return &PutResponse{
		id: ids.SelfID,
	}, nil
}

func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
	if c.netmapKeys.IsLocalKey(pub) {
		return c.localPool, true
	}
	return c.remotePool, false
}