package engine

import (
	"context"
	"math/rand"
	"sort"

	objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
)

// ErrEndOfListing is returned from an object listing with cursor
// when the storage can't return any more objects after the provided
// cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = shard.ErrEndOfListing

// Cursor is a type for continuous object listing. Cursor contains shard IDs to read
// and shard cursors that contain state from previous read.
type Cursor struct {
	current         string
	shardIDs        map[string]bool
	shardIDToCursor map[string]*shard.Cursor
}

func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
	return c.shardIDToCursor[c.current]
}

func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
	c.shardIDToCursor[c.current] = sc
}

func (c *Cursor) nextShard() bool {
	var shardsToRead []string
	for shardID, read := range c.shardIDs {
		if !read {
			shardsToRead = append(shardsToRead, shardID)
		}
	}
	if len(shardsToRead) == 0 {
		return false
	}
	c.current = shardsToRead[rand.Intn(len(shardsToRead))]
	return true
}

func (c *Cursor) setShardRead(shardID string) {
	c.shardIDs[shardID] = true
}

// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
	count  uint32
	cursor *Cursor
}

// WithCount sets the maximum amount of addresses that ListWithCursor should return.
func (p *ListWithCursorPrm) WithCount(count uint32) {
	p.count = count
}

// WithCursor sets a cursor for ListWithCursor operation. For initial request
// ignore this param or use nil value. For consecutive requests, use value
// from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
	p.cursor = cursor
}

// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
	addrList []objectcore.AddressWithType
	cursor   *Cursor
}

// AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
	return l.addrList
}

// Cursor returns cursor for consecutive listing requests.
func (l ListWithCursorRes) Cursor() *Cursor {
	return l.cursor
}

// ListWithCursor lists physical objects available in the engine starting
// from the cursor. It includes regular, tombstone and storage group objects.
// Does not include inhumed objects. Use cursor value from the response
// for consecutive requests.
//
// If count param is big enough, then the method reads objects from different shards
// by portions. In this case shards are chosen randomly, if they're not read out yet.
//
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
// won't be listed.
// Removing a shard between ListWithCursor leads to the undefined behavior
// (e.g. usage of the objects from the removed shard).
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
	result := make([]objectcore.AddressWithType, 0, prm.count)

	// Set initial cursors
	cursor := prm.cursor
	if cursor == nil {
		shardIDs := getSortedShardIDs(e)
		if len(shardIDs) == 0 {
			return ListWithCursorRes{}, ErrEndOfListing
		}
		cursor = newCursor(shardIDs)
	}

	const (
		splitShardCountLimit = 100
		shardsNum            = 4
	)

	batchSize := prm.count
	if batchSize >= splitShardCountLimit {
		batchSize /= shardsNum
	}

	for cursor.nextShard() {
		if len(result) >= int(prm.count) {
			break
		}
		curr := cursor.current

		e.mtx.RLock()
		shardInstance, ok := e.shards[curr]
		e.mtx.RUnlock()
		if !ok {
			cursor.setShardRead(curr)
			continue
		}

		count := prm.count - uint32(len(result))
		if count > batchSize {
			count = batchSize
		}

		var shardPrm shard.ListWithCursorPrm
		shardPrm.WithCount(count)
		shardPrm.WithCursor(cursor.getCurrentShardCursor())

		res, err := shardInstance.ListWithCursor(ctx, shardPrm)
		if err != nil {
			cursor.setShardRead(curr)
			continue
		}
		result = append(result, res.AddressList()...)
		cursor.setCurrentShardCursor(res.Cursor())
	}

	if len(result) == 0 {
		return ListWithCursorRes{}, ErrEndOfListing
	}

	return ListWithCursorRes{
		addrList: result,
		cursor:   cursor,
	}, nil
}

func getSortedShardIDs(e *StorageEngine) []string {
	e.mtx.RLock()
	shardIDs := make([]string, 0, len(e.shards))
	for id := range e.shards {
		shardIDs = append(shardIDs, id)
	}
	e.mtx.RUnlock()
	sort.Strings(shardIDs)
	return shardIDs
}

func newCursor(shardIDs []string) *Cursor {
	shardIDsMap := make(map[string]bool)
	shardIDToCursor := make(map[string]*shard.Cursor)
	for _, shardID := range shardIDs {
		shardIDsMap[shardID] = false
	}
	return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
}