package main

import (
	"bytes"
	"context"
	"net"

	morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
	containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
	frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
	cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
	containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
	containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
	containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
	containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
	containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container/grpc"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
	"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
	"go.uber.org/zap"
	"google.golang.org/grpc"
)

func initContainerService(_ context.Context, c *cfg) {
	// container wrapper that tries to invoke notary
	// requests if chain is configured so
	wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
	fatalOnErr(err)

	c.shared.cnrClient = wrap

	cnrSrc := cntClient.AsContainerSource(wrap)

	cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)

	var frostfsIDSubjectProvider frostfsidcore.SubjectProvider
	frostfsIDSubjectProvider, err = frostfsid.NewFromMorph(c.cfgMorph.client, c.cfgFrostfsID.scriptHash, 0)
	fatalOnErr(err)

	cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
	if cacheSize > 0 {
		frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
	}

	c.shared.frostfsidClient = frostfsIDSubjectProvider

	defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
		c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
		c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage(),
	)
	service := containerService.NewSignService(
		&c.key.PrivateKey,
		containerService.NewAPEServer(defaultChainRouter, cnrRdr,
			newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
			containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
		),
	)
	service = containerService.NewAuditService(service, c.log, c.audit)
	server := containerTransportGRPC.New(service)

	c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
		containerGRPC.RegisterContainerServiceServer(s, server)

		// TODO(@aarifullin): #1487 remove the dual service support.
		s.RegisterService(frostFSServiceDesc(containerGRPC.ContainerService_ServiceDesc), server)
	})

	c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
}

func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
	cnrRdr := new(morphContainerReader)

	cnrWrt := &morphContainerWriter{
		neoClient: client,
	}

	if c.cfgMorph.cacheTTL <= 0 {
		c.cfgObject.cnrSource = cnrSrc
		cnrRdr.src = cnrSrc
		cnrRdr.lister = client
	} else {
		// use RPC node as source of Container contract items (with caching)
		c.cfgObject.cnrSource = cnrSrc
		if c.cfgMorph.containerCacheSize > 0 {
			containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)

			subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) {
				ev := e.(containerEvent.PutSuccess)

				// read owner of the created container in order to update the reading cache.
				// TODO: use owner directly from the event after neofs-contract#256 will become resolved
				//  but don't forget about the profit of reading the new container and caching it:
				//  creation success are most commonly tracked by polling GET op.
				cnr, err := cnrSrc.Get(ev.ID)
				if err == nil {
					containerCache.containerCache.set(ev.ID, cnr, nil)
				} else {
					// unlike removal, we expect successful receive of the container
					// after successful creation, so logging can be useful
					c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
						zap.Stringer("id", ev.ID),
						zap.Error(err),
					)
				}

				c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt,
					zap.Stringer("id", ev.ID),
				)
			})

			subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
				ev := e.(containerEvent.DeleteSuccess)
				containerCache.handleRemoval(ev.ID)
				c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt,
					zap.Stringer("id", ev.ID),
				)
			})
			c.cfgObject.cnrSource = containerCache
		}

		cnrRdr.lister = client
		cnrRdr.src = c.cfgObject.cnrSource
	}

	return cnrRdr, cnrWrt
}

// addContainerNotificationHandler adds handler that will be executed synchronously.
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
	typ := event.TypeFromString(sTyp)

	if c.cfgContainer.subscribers == nil {
		c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
	}

	c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
}

// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
	addContainerNotificationHandler(
		c,
		sTyp,
		event.WorkerPoolHandler(
			c.cfgContainer.workerPool,
			h,
			c.log,
		),
	)
}

// stores already registered parsers of the notification events thrown by Container contract.
// MUST NOT be used concurrently.
var mRegisteredParsersContainer = make(map[string]struct{})

// registers event parser by name once. MUST NOT be called concurrently.
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
	if _, ok := mRegisteredParsersContainer[name]; !ok {
		setContainerNotificationParser(c, name, p)
		mRegisteredParsersContainer[name] = struct{}{}
	}
}

// subscribes to successful container creation. Provided handler is called asynchronously
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
// similar functions.
func subscribeToContainerCreation(c *cfg, h event.Handler) {
	const eventNameContainerCreated = "PutSuccess"
	registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
	addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
}

// like subscribeToContainerCreation but for removal.
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
	const eventNameContainerRemoved = "DeleteSuccess"
	registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
	addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
}

func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
	typ := event.TypeFromString(sTyp)

	if c.cfgContainer.parsers == nil {
		c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
	}

	c.cfgContainer.parsers[typ] = p
}

func (c *cfg) PublicKey() []byte {
	return nodeKeyFromNetmap(c)
}

func (c *cfg) IsLocalKey(key []byte) bool {
	return bytes.Equal(key, c.PublicKey())
}

func (c *cfg) IterateAddresses(f func(string) bool) {
	c.iterateNetworkAddresses(f)
}

func (c *cfg) NumberOfAddresses() int {
	return c.addressNum()
}

func (c *cfg) ExternalAddresses() []string {
	return c.cfgNodeInfo.localInfo.ExternalAddresses()
}

// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
	src containerCore.Source

	lister interface {
		ContainersOf(*user.ID) ([]cid.ID, error)
	}
}

func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
	return x.src.Get(id)
}

func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
	return x.src.DeletionInfo(id)
}

func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
	return x.lister.ContainersOf(id)
}

type morphContainerWriter struct {
	neoClient *cntClient.Client
}

func (m morphContainerWriter) Put(ctx context.Context, cnr containerCore.Container) (*cid.ID, error) {
	return cntClient.Put(ctx, m.neoClient, cnr)
}

func (m morphContainerWriter) Delete(ctx context.Context, witness containerCore.RemovalWitness) error {
	return cntClient.Delete(ctx, m.neoClient, witness)
}