package layer

import (
	"context"
	"errors"
	"fmt"
	"io"
	"sort"
	"strings"
	"sync"

	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
	s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
	"github.com/panjf2000/ants/v2"
	"go.uber.org/zap"
)

type (
	// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
	ListObjectsParamsCommon struct {
		BktInfo   *data.BucketInfo
		Delimiter string
		Encode    string
		MaxKeys   int
		Prefix    string
	}

	// ListObjectsParamsV1 contains params for ListObjectsV1.
	ListObjectsParamsV1 struct {
		ListObjectsParamsCommon
		Marker string
	}

	// ListObjectsParamsV2 contains params for ListObjectsV2.
	ListObjectsParamsV2 struct {
		ListObjectsParamsCommon
		ContinuationToken string
		StartAfter        string
		FetchOwner        bool
	}

	// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
	ListObjectsInfo struct {
		Prefixes    []string
		Objects     []*data.ExtendedNodeVersion
		IsTruncated bool
	}

	// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
	ListObjectsInfoV1 struct {
		ListObjectsInfo
		NextMarker string
	}

	// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
	ListObjectsInfoV2 struct {
		ListObjectsInfo
		NextContinuationToken string
	}

	// ListObjectVersionsInfo stores info and list of objects versions.
	ListObjectVersionsInfo struct {
		CommonPrefixes      []string
		IsTruncated         bool
		KeyMarker           string
		NextKeyMarker       string
		NextVersionIDMarker string
		Version             []*data.ExtendedNodeVersion
		DeleteMarker        []*data.ExtendedNodeVersion
		VersionIDMarker     string
	}

	commonVersionsListingParams struct {
		BktInfo   *data.BucketInfo
		Delimiter string
		Prefix    string
		MaxKeys   int
		Marker    string
		Bookmark  string
	}

	commonLatestVersionsListingParams struct {
		commonVersionsListingParams
		ListType ListType
	}
)

type ListType int

const (
	ListObjectsV1Type ListType = iota + 1
	ListObjectsV2Type ListType = iota + 1
)

// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
	var result ListObjectsInfoV1

	prm := commonLatestVersionsListingParams{
		commonVersionsListingParams: commonVersionsListingParams{
			BktInfo:   p.BktInfo,
			Delimiter: p.Delimiter,
			Prefix:    p.Prefix,
			MaxKeys:   p.MaxKeys,
			Marker:    p.Marker,
			Bookmark:  p.Marker,
		},
		ListType: ListObjectsV1Type,
	}

	objects, next, err := n.getLatestObjectsVersions(ctx, prm)
	if err != nil {
		return nil, err
	}

	if next != nil {
		result.IsTruncated = true
		result.NextMarker = objects[len(objects)-1].Name()
	}

	result.Prefixes, result.Objects = triageExtendedObjects(objects)

	return &result, nil
}

// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
	var result ListObjectsInfoV2

	prm := commonLatestVersionsListingParams{
		commonVersionsListingParams: commonVersionsListingParams{
			BktInfo:   p.BktInfo,
			Delimiter: p.Delimiter,
			Prefix:    p.Prefix,
			MaxKeys:   p.MaxKeys,
			Marker:    p.StartAfter,
			Bookmark:  p.ContinuationToken,
		},
		ListType: ListObjectsV2Type,
	}

	objects, next, err := n.getLatestObjectsVersions(ctx, prm)
	if err != nil {
		return nil, err
	}

	if next != nil {
		result.IsTruncated = true
		result.NextContinuationToken = next.NodeVersion.OID.EncodeToString()
	}

	result.Prefixes, result.Objects = triageExtendedObjects(objects)

	return &result, nil
}

func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
	prm := commonVersionsListingParams{
		BktInfo:   p.BktInfo,
		Delimiter: p.Delimiter,
		Prefix:    p.Prefix,
		MaxKeys:   p.MaxKeys,
		Marker:    p.KeyMarker,
		Bookmark:  p.VersionIDMarker,
	}

	objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
	if err != nil {
		return nil, err
	}

	res := &ListObjectVersionsInfo{
		KeyMarker:       p.KeyMarker,
		VersionIDMarker: p.VersionIDMarker,
		IsTruncated:     isTruncated,
	}

	if res.IsTruncated {
		res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath
		res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
	}

	res.CommonPrefixes, objects = triageExtendedObjects(objects)
	res.Version, res.DeleteMarker = triageVersions(objects)
	return res, nil
}

func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) {
	if p.MaxKeys == 0 {
		return nil, nil, nil
	}

	session, err := n.getListLatestVersionsSession(ctx, p)
	if err != nil {
		return nil, nil, err
	}

	generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
	objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
	}

	objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1)
	objects = append(objects, session.Next...)
	for obj := range objOutCh {
		objects = append(objects, obj)
	}

	if err = <-errorCh; err != nil {
		return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
	}

	sort.Slice(objects, func(i, j int) bool { return objects[i].NodeVersion.FilePath < objects[j].NodeVersion.FilePath })

	if len(objects) > p.MaxKeys {
		next = objects[p.MaxKeys]
		n.putListLatestVersionsSession(ctx, p, session, objects)
		objects = objects[:p.MaxKeys]
	}

	return
}

func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
	if p.MaxKeys == 0 {
		return nil, false, nil
	}

	session, err := n.getListAllVersionsSession(ctx, p)
	if err != nil {
		return nil, false, err
	}

	generator, errorCh := nodesGeneratorVersions(ctx, p, session)
	objOutCh, err := n.initWorkerPool(ctx, 2, p, generator)
	if err != nil {
		return nil, false, err
	}

	allObjects := handleGeneratedVersions(objOutCh, p, session)

	sort.SliceStable(allObjects, func(i, j int) bool { return allObjects[i].NodeVersion.FilePath < allObjects[j].NodeVersion.FilePath })

	if err = <-errorCh; err != nil {
		return nil, false, fmt.Errorf("failed to get next object from stream: %w", err)
	}

	var isTruncated bool
	if len(allObjects) > p.MaxKeys {
		isTruncated = true
		n.putListAllVersionsSession(ctx, p, session, allObjects)
		allObjects = allObjects[:p.MaxKeys]
	}

	return allObjects, isTruncated, nil
}

func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion {
	var lastName string
	var listRowStartIndex int
	allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
	for eoi := range objOutCh {
		name := eoi.NodeVersion.FilePath
		if eoi.DirName != "" {
			name = eoi.DirName
		}

		if lastName != name {
			formVersionsListRow(allObjects, listRowStartIndex, session)
			listRowStartIndex = len(allObjects)
			allObjects = append(allObjects, eoi)
		} else if eoi.DirName == "" {
			allObjects = append(allObjects, eoi)
		}
		lastName = name
	}

	formVersionsListRow(allObjects, listRowStartIndex, session)

	return allObjects
}

func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, session *data.ListSession) {
	if len(objects) == 0 {
		return
	}

	prevVersions := objects[rowStartIndex:]
	sort.Slice(prevVersions, func(i, j int) bool {
		return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first
	})

	prevVersions[0].IsLatest = len(session.Next) == 0 || session.Next[0].NodeVersion.FilePath != prevVersions[0].NodeVersion.FilePath

	for _, version := range prevVersions[1:] {
		version.IsLatest = false
	}
}

func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
	return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
}

func (n *layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
	return n.getListVersionsSession(ctx, p, false)
}

func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
	owner := n.BearerOwner(ctx)

	cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
	session := n.cache.GetListSession(owner, cacheKey)
	if session == nil {
		return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
	}

	if session.Acquired.Swap(true) {
		return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
	}

	// after reading next object from stream in session
	// the current cache value already doesn't match with next token in cache key
	n.cache.DeleteListSession(owner, cacheKey)

	return session, nil
}

func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
	session = &data.ListSession{NamesMap: make(map[string]struct{})}
	session.Context, session.Cancel = context.WithCancel(context.Background())

	if bd, err := middleware.GetBoxData(ctx); err == nil {
		session.Context = middleware.SetBoxData(session.Context, bd)
	}

	session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
	if err != nil {
		return nil, err
	}

	return session, nil
}

func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
	if len(allObjects) <= p.MaxKeys {
		return
	}

	var cacheKey cache.ListSessionKey
	switch p.ListType {
	case ListObjectsV1Type:
		cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].Name())
	case ListObjectsV2Type:
		cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].NodeVersion.OID.EncodeToString())
	default:
		// should never happen
		panic("invalid list type")
	}

	session.Acquired.Store(false)
	session.Next = []*data.ExtendedNodeVersion{allObjects[p.MaxKeys]}
	n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
}

func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
	if len(allObjects) <= p.MaxKeys {
		return
	}

	session.Acquired.Store(false)

	session.Next = make([]*data.ExtendedNodeVersion, len(allObjects)-p.MaxKeys+1)
	session.Next[0] = allObjects[p.MaxKeys-1]
	for i, node := range allObjects[p.MaxKeys:] {
		session.Next[i+1] = node
	}

	cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].NodeVersion.OID.EncodeToString())
	n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
}

func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
	nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
	errCh := make(chan error, 1)
	existed := stream.NamesMap

	if len(stream.Next) != 0 {
		existed[continuationToken] = struct{}{}
	}

	limit := p.MaxKeys
	if len(stream.Next) == 0 {
		limit++
	}

	go func() {
		var generated int
		var err error

	LOOP:
		for err == nil {
			node, err := stream.Stream.Next(ctx)
			if err != nil {
				if !errors.Is(err, io.EOF) {
					errCh <- fmt.Errorf("stream next: %w", err)
				}
				break LOOP
			}

			nodeExt := &data.ExtendedNodeVersion{
				NodeVersion: node,
				IsLatest:    true,
				DirName:     tryDirectoryName(node, p.Prefix, p.Delimiter),
			}

			if shouldSkip(nodeExt, p, existed) {
				continue
			}

			select {
			case <-ctx.Done():
				break LOOP
			case nodeCh <- nodeExt:
				generated++

				if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
					break LOOP
				}
			}
		}
		close(nodeCh)
		close(errCh)
	}()

	return nodeCh, errCh
}

func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
	nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
	errCh := make(chan error, 1)
	existed := stream.NamesMap

	delete(existed, continuationToken)

	go func() {
		var (
			generated int
			ind       int
			err       error
			lastName  string
			node      *data.NodeVersion
			nodeExt   *data.ExtendedNodeVersion
		)

	LOOP:
		for err == nil {
			if ind < len(stream.Next) {
				nodeExt = stream.Next[ind]
				ind++
			} else {
				node, err = stream.Stream.Next(ctx)
				if err != nil {
					if !errors.Is(err, io.EOF) {
						errCh <- fmt.Errorf("stream next: %w", err)
					}
					break LOOP
				}

				nodeExt = &data.ExtendedNodeVersion{
					NodeVersion: node,
					DirName:     tryDirectoryName(node, p.Prefix, p.Delimiter),
				}
			}

			if shouldSkipVersions(nodeExt, p, existed) {
				continue
			}

			select {
			case <-ctx.Done():
				break LOOP
			case nodeCh <- nodeExt:
				generated++
				if generated > p.MaxKeys && nodeExt.NodeVersion.FilePath != lastName {
					break LOOP
				}
				lastName = nodeExt.NodeVersion.FilePath
			}
		}
		close(nodeCh)
		close(errCh)
	}()

	return nodeCh, errCh
}

func (n *layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
	reqLog := n.reqLogger(ctx)
	pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
	if err != nil {
		return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
	}
	objCh := make(chan *data.ExtendedNodeVersion, size)

	go func() {
		var wg sync.WaitGroup

	LOOP:
		for node := range input {
			select {
			case <-ctx.Done():
				break LOOP
			default:
			}

			if node.DirName != "" || node.NodeVersion.IsFilledExtra() {
				select {
				case <-ctx.Done():
				case objCh <- node:
				}
			} else {
				// We have to make a copy of pointer to data.NodeVersion
				// to get correct value in submitted task function.
				func(node *data.ExtendedNodeVersion) {
					wg.Add(1)
					err = pool.Submit(func() {
						defer wg.Done()

						oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion)
						if oi == nil {
							// try to get object again
							if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil {
								// do not process object which are definitely missing in object service
								return
							}
						}

						realSize, err := GetObjectSize(oi)
						if err != nil {
							reqLog.Debug(logs.FailedToGetRealObjectSize, zap.Error(err))
							realSize = oi.Size
						}

						node.NodeVersion.FillExtra(&oi.Owner, &oi.Created, realSize)

						select {
						case <-ctx.Done():
						case objCh <- node:
						}
					})
					if err != nil {
						wg.Done()
						reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
					}
				}(node)
			}
		}
		wg.Wait()
		close(objCh)
		pool.Release()
	}()

	return objCh, nil
}

func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
	if node.NodeVersion.IsDeleteMarker {
		return true
	}

	filePath := node.NodeVersion.FilePath
	if node.DirName != "" {
		filePath = node.DirName
	}

	if _, ok := existed[filePath]; ok {
		return true
	}

	if filePath <= p.Marker {
		return true
	}

	if p.Bookmark != "" {
		if _, ok := existed[continuationToken]; !ok {
			if p.Bookmark != node.NodeVersion.OID.EncodeToString() {
				return true
			}
			existed[continuationToken] = struct{}{}
		}
	}

	existed[filePath] = struct{}{}
	return false
}

func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
	filePath := node.NodeVersion.FilePath
	if node.DirName != "" {
		filePath = node.DirName
		if _, ok := existed[filePath]; ok {
			return true
		}
	}

	if filePath < p.Marker {
		return true
	}

	if p.Bookmark != "" {
		if _, ok := existed[continuationToken]; !ok {
			if p.Bookmark != node.NodeVersion.OID.EncodeToString() {
				return true
			}
			existed[continuationToken] = struct{}{}
			return true
		}
	}

	existed[filePath] = struct{}{}
	return false
}

func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion) (prefixes []string, objects []*data.ExtendedNodeVersion) {
	for _, ov := range allObjects {
		if ov.DirName != "" {
			prefixes = append(prefixes, ov.DirName)
		} else {
			objects = append(objects, ov)
		}
	}

	return
}

func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
	owner := n.BearerOwner(ctx)
	if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
		return extInfo.ObjectInfo
	}

	meta, err := n.objectHead(ctx, bktInfo, node.OID)
	if err != nil {
		n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
		return nil
	}

	oi = objectInfoFromMeta(bktInfo, meta)
	oi.MD5Sum = node.MD5
	n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})

	return oi
}

// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
	if len(delimiter) == 0 {
		return ""
	}

	tail := strings.TrimPrefix(node.FilePath, prefix)
	index := strings.Index(tail, delimiter)
	if index >= 0 {
		return prefix + tail[:index+1]
	}

	return ""
}

func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) {
	if p.KeyMarker == "" {
		return objects, nil
	}

	for i, obj := range objects {
		if obj.NodeVersion.FilePath == p.KeyMarker {
			for j := i; j < len(objects); j++ {
				if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath {
					if p.VersionIDMarker == "" {
						return objects[j:], nil
					}
					break
				}
				if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker {
					return objects[j+1:], nil
				}
			}
			return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
		} else if obj.NodeVersion.FilePath > p.KeyMarker {
			if p.VersionIDMarker != "" {
				return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
			}
			return objects[i:], nil
		}
	}

	// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
	// that can be empty
	return []*data.ExtendedNodeVersion{}, nil
}

func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) {
	if len(objVersions) == 0 {
		return nil, nil
	}

	var resVersion []*data.ExtendedNodeVersion
	var resDelMarkVersions []*data.ExtendedNodeVersion

	for _, version := range objVersions {
		if version.NodeVersion.IsDeleteMarker {
			resDelMarkVersions = append(resDelMarkVersions, version)
		} else {
			resVersion = append(resVersion, version)
		}
	}

	return resVersion, resDelMarkVersions
}