package bqueue

import (
	"sync"

	"github.com/nspcc-dev/neo-go/pkg/core/block"
	"go.uber.org/atomic"
	"go.uber.org/zap"
)

// Blockqueuer is an interface for a block queue.
type Blockqueuer interface {
	AddBlock(block *block.Block) error
	AddHeaders(...*block.Header) error
	BlockHeight() uint32
}

// Queue is the block queue.
type Queue struct {
	log         *zap.Logger
	queueLock   sync.RWMutex
	queue       []*block.Block
	lastQ       uint32
	checkBlocks chan struct{}
	chain       Blockqueuer
	relayF      func(*block.Block)
	discarded   *atomic.Bool
	len         int
	lenUpdateF  func(int)
}

// CacheSize is the amount of blocks above the current height
// which are stored in the queue.
const CacheSize = 2000

func indexToPosition(i uint32) int {
	return int(i) % CacheSize
}

// New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue {
	if log == nil {
		return nil
	}

	return &Queue{
		log:         log,
		queue:       make([]*block.Block, CacheSize),
		checkBlocks: make(chan struct{}, 1),
		chain:       bc,
		relayF:      relayer,
		discarded:   atomic.NewBool(false),
		lenUpdateF:  lenMetricsUpdater,
	}
}

// Run runs the BlockQueue queueing loop. It must be called in a separate routine.
func (bq *Queue) Run() {
	var lastHeight = bq.chain.BlockHeight()
	for {
		_, ok := <-bq.checkBlocks
		if !ok {
			break
		}
		for {
			h := bq.chain.BlockHeight()
			pos := indexToPosition(h + 1)
			bq.queueLock.Lock()
			b := bq.queue[pos]
			// The chain moved forward using blocks from other sources (consensus).
			for i := lastHeight; i < h; i++ {
				old := indexToPosition(i + 1)
				if bq.queue[old] != nil && bq.queue[old].Index == i {
					bq.len--
					bq.queue[old] = nil
				}
			}
			bq.queueLock.Unlock()
			lastHeight = h
			if b == nil {
				break
			}

			err := bq.chain.AddBlock(b)
			if err != nil {
				// The block might already be added by the consensus.
				if bq.chain.BlockHeight() < b.Index {
					bq.log.Warn("blockQueue: failed adding block into the blockchain",
						zap.String("error", err.Error()),
						zap.Uint32("blockHeight", bq.chain.BlockHeight()),
						zap.Uint32("nextIndex", b.Index))
				}
			} else if bq.relayF != nil {
				bq.relayF(b)
			}
			bq.queueLock.Lock()
			bq.len--
			l := bq.len
			if bq.queue[pos] == b {
				bq.queue[pos] = nil
			}
			bq.queueLock.Unlock()
			if bq.lenUpdateF != nil {
				bq.lenUpdateF(l)
			}
		}
	}
}

// PutBlock enqueues block to be added to the chain.
func (bq *Queue) PutBlock(block *block.Block) error {
	h := bq.chain.BlockHeight()
	bq.queueLock.Lock()
	defer bq.queueLock.Unlock()
	if bq.discarded.Load() {
		return nil
	}
	if block.Index <= h || h+CacheSize < block.Index {
		// can easily happen when fetching the same blocks from
		// different peers, thus not considered as error
		return nil
	}
	pos := indexToPosition(block.Index)
	// If we already have it, keep the old block, throw away the new one.
	if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
		bq.len++
		bq.queue[pos] = block
		for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
			bq.lastQ = bq.queue[pos].Index
			pos++
		}
	}
	// update metrics
	if bq.lenUpdateF != nil {
		bq.lenUpdateF(bq.len)
	}
	select {
	case bq.checkBlocks <- struct{}{}:
		// ok, signalled to goroutine processing queue
	default:
		// it's already busy processing blocks
	}
	return nil
}

// LastQueued returns the index of the last queued block and the queue's capacity
// left.
func (bq *Queue) LastQueued() (uint32, int) {
	bq.queueLock.RLock()
	defer bq.queueLock.RUnlock()
	return bq.lastQ, CacheSize - bq.len
}

// Discard stops the queue and prevents it from accepting more blocks to enqueue.
func (bq *Queue) Discard() {
	if bq.discarded.CompareAndSwap(false, true) {
		bq.queueLock.Lock()
		close(bq.checkBlocks)
		// Technically we could bq.queue = nil, but this would cost
		// another if in Run().
		for i := 0; i < len(bq.queue); i++ {
			bq.queue[i] = nil
		}
		bq.len = 0
		bq.queueLock.Unlock()
	}
}