package writecache

import (
	"errors"
	"sync"
)

var errLimiterClosed = errors.New("acquire failed: limiter closed")

// flushLimiter is used to limit the total size of objects
// being flushed to blobstore at the same time. This is a necessary
// limitation so that the flushing process does not have
// a strong impact on user requests.
type flushLimiter struct {
	count, size uint64
	maxSize     uint64
	cond        *sync.Cond
	closed      bool
}

func newFlushLimiter(maxSize uint64) *flushLimiter {
	return &flushLimiter{
		maxSize: maxSize,
		cond:    sync.NewCond(&sync.Mutex{}),
	}
}

func (l *flushLimiter) acquire(size uint64) error {
	l.cond.L.Lock()
	defer l.cond.L.Unlock()

	// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
	for l.count > 0 && l.size+size > l.maxSize && !l.closed {
		l.cond.Wait()
		if l.closed {
			return errLimiterClosed
		}
	}
	l.count++
	l.size += size
	return nil
}

func (l *flushLimiter) release(size uint64) {
	l.cond.L.Lock()
	defer l.cond.L.Unlock()

	if l.size >= size {
		l.size -= size
	} else {
		panic("flushLimiter: invalid size")
	}

	if l.count > 0 {
		l.count--
	} else {
		panic("flushLimiter: invalid count")
	}

	l.cond.Broadcast()
}

func (l *flushLimiter) close() {
	l.cond.L.Lock()
	defer l.cond.L.Unlock()

	l.closed = true

	l.cond.Broadcast()
}