[#9999] quota: Fix linter warnings

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-17 10:53:48 +03:00
parent c3aaa19cd1
commit 0cccf4751b
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
4 changed files with 43 additions and 431 deletions

View file

@ -1,290 +0,0 @@
package quota
import (
"container/heap"
"context"
"fmt"
"math/rand/v2"
"sync"
)
type Quota struct {
Read int64
Write int64
}
type Priority struct {
Class string
Value byte
}
type Release func()
type limit struct {
read, write int64
maxRead, maxWrite int64
}
type queueItem struct {
priority byte
ts uint64
index int
}
type queue struct {
l *limit
w int32
items []*queueItem
}
type QuotaLimiter struct {
l *limit
classes map[string]*queue
queues []*queue
nextQueue *queue
cond *sync.Cond
ts uint64
}
func (ql *QuotaLimiter) Acquire(ctx context.Context, p Priority, q Quota) (Release, error) {
queue, ok := ql.classes[p.Class]
if !ok {
return nil, fmt.Errorf("unknown class '%s'", p.Class)
}
if ql.l.maxRead > 0 && q.Read > ql.l.maxRead {
return nil, fmt.Errorf("read quota %d exceeds total limit %d", q.Read, ql.l.maxRead)
}
if ql.l.maxWrite > 0 && q.Write > ql.l.maxWrite {
return nil, fmt.Errorf("write quota %d exceeds total limit %d", q.Write, ql.l.maxWrite)
}
if queue.l.maxRead > 0 && q.Read > queue.l.maxRead {
return nil, fmt.Errorf("read quota %d exceeds queue limit %d", q.Read, queue.l.maxRead)
}
if queue.l.maxWrite > 0 && q.Write > queue.l.maxWrite {
return nil, fmt.Errorf("write quota %d exceeds queue limit %d", q.Write, queue.l.maxWrite)
}
ql.cond.L.Lock()
defer ql.cond.L.Unlock()
allow := ql.nextQueue == nil && // no scheduled queue
hasQuota(q, queue.l) && // queue limit
hasQuota(q, ql.l) // global limit
if allow {
applyQuota(q, queue.l)
applyQuota(q, ql.l)
return func() { ql.release(p, q) }, nil
}
stop := context.AfterFunc(ctx, func() {
ql.cond.Broadcast()
})
defer stop()
ql.ts++
qi := &queueItem{
priority: p.Value,
ts: ql.ts,
}
queue.push(qi)
if queue.count() == 1 {
ql.resetNextQueue()
}
var hasGlobalQuota, hasQueueQuota, isNextItem bool
for !allow {
ql.cond.Wait()
if err := ctx.Err(); err != nil {
queue.drop(qi)
if queue.count() == 0 {
ql.resetNextQueue()
}
return nil, ctx.Err()
}
hasGlobalQuota = hasQuota(q, ql.l)
hasQueueQuota = hasQuota(q, queue.l)
isNextItem = ql.nextQueue == queue && queue.top() == qi
if hasGlobalQuota && !hasQueueQuota && isNextItem {
ql.changeNextQueue()
}
allow = hasGlobalQuota && hasQueueQuota && isNextItem
}
applyQuota(q, queue.l)
applyQuota(q, ql.l)
queue.pop()
ql.resetNextQueue()
return func() { ql.release(p, q) }, nil
}
func (ql *QuotaLimiter) release(p Priority, q Quota) {
queue, ok := ql.classes[p.Class]
if !ok {
panic("unknown class " + p.Class)
}
ql.cond.L.Lock()
defer ql.cond.L.Unlock()
releaseQuota(q, queue.l)
releaseQuota(q, ql.l)
ql.cond.Broadcast()
}
func (ql *QuotaLimiter) resetNextQueue() {
var nonEmptyQueues []*queue
var totalWeight int64
for _, q := range ql.queues {
if q.count() > 0 {
nonEmptyQueues = append(nonEmptyQueues, q)
totalWeight += int64(q.weight())
}
}
if len(nonEmptyQueues) == 0 {
ql.nextQueue = nil
return
}
ql.selectNextQueue(nonEmptyQueues, totalWeight)
}
func (ql *QuotaLimiter) changeNextQueue() {
var nonEmptyQueues []*queue
var totalWeight int64
for _, q := range ql.queues {
if q.count() > 0 && q != ql.nextQueue {
nonEmptyQueues = append(nonEmptyQueues, q)
totalWeight += int64(q.weight())
}
}
if len(nonEmptyQueues) == 0 {
return
}
ql.selectNextQueue(nonEmptyQueues, totalWeight)
}
func (ql *QuotaLimiter) selectNextQueue(queues []*queue, totalWeight int64) {
if totalWeight == 0 {
ql.nextQueue = queues[rand.IntN(len(queues))]
return
}
weight := rand.Int64N(totalWeight)
var low, up int64
for _, q := range queues {
low = up
up += int64(q.weight())
if weight >= low && weight < up {
ql.nextQueue = q
return
}
}
panic("undefined next queue")
}
func hasQuota(q Quota, l *limit) bool {
if q.Read > 0 && l.maxRead > 0 && q.Read+l.read > l.maxRead {
return false
}
if q.Write > 0 && l.maxWrite > 0 && q.Write+l.write > l.write {
return false
}
return true
}
func applyQuota(q Quota, l *limit) {
if q.Read > 0 && l.maxRead > 0 {
l.read += q.Read
}
if q.Write > 0 && l.maxWrite > 0 {
l.write += q.Write
}
}
func releaseQuota(q Quota, l *limit) {
if q.Read > 0 && l.maxRead > 0 {
l.read -= q.Read
if l.read < 0 {
panic("invalid read limit after release")
}
}
if q.Write > 0 && l.maxWrite > 0 {
l.write -= q.Write
if l.write < 0 {
panic("invalid write limit after release")
}
}
}
func (q *queue) push(qi *queueItem) {
heap.Push(q, qi)
}
func (q *queue) pop() {
heap.Pop(q)
}
func (q *queue) drop(qi *queueItem) {
heap.Remove(q, qi.index)
}
func (q *queue) top() *queueItem {
if len(q.items) > 0 {
return q.items[0]
}
return nil
}
func (q *queue) count() int {
return len(q.items)
}
func (q *queue) weight() int32 {
return q.w
}
// Len implements heap.Interface.
func (q *queue) Len() int {
return q.count()
}
// Less implements heap.Interface.
func (q *queue) Less(i int, j int) bool {
if q.items[i].priority == q.items[j].priority {
return q.items[i].ts < q.items[j].ts
}
return q.items[i].priority > q.items[j].priority
}
// Pop implements heap.Interface.
func (q *queue) Pop() any {
n := len(q.items)
item := q.items[n-1]
q.items[n-1] = nil
q.items = q.items[0 : n-1]
item.index = -1
return item
}
// Push implements heap.Interface.
func (q *queue) Push(x any) {
it := x.(*queueItem)
it.index = q.Len()
q.items = append(q.items, it)
}
// Swap implements heap.Interface.
func (q *queue) Swap(i int, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].index = i
q.items[j].index = j
}

View file

@ -1,100 +0,0 @@
package quota
import (
"math/rand/v2"
"testing"
"github.com/stretchr/testify/require"
)
func TestQueue(t *testing.T) {
t.Run("different priority", func(t *testing.T) {
q := &queue{}
const count = 12345
for i := range count {
priority := i % 256
q.push(&queueItem{
priority: byte(priority),
})
}
testQueueInvariant(t, q)
})
t.Run("same priority, different ts, inc", func(t *testing.T) {
q := &queue{}
var ts uint64
const count = 10000
for range count {
q.push(&queueItem{
priority: 100,
ts: ts,
})
ts++
}
testQueueInvariant(t, q)
})
t.Run("same priority, different ts, dec", func(t *testing.T) {
q := &queue{}
const count = 10000
ts := uint64(count)
for range count {
q.push(&queueItem{
priority: 100,
ts: ts,
})
ts--
}
testQueueInvariant(t, q)
})
t.Run("drop, inc", func(t *testing.T) {
q := &queue{}
var ts uint64
const count = 12345
for i := range count {
q.push(&queueItem{
priority: byte(i % 256),
ts: ts,
})
ts++
}
for q.Len() > 0 {
idx := rand.IntN(q.count())
it := q.items[idx]
q.drop(it)
testQueueInvariant(t, q)
}
})
t.Run("drop, dec", func(t *testing.T) {
q := &queue{}
const count = 12345
ts := uint64(count)
for i := range count {
q.push(&queueItem{
priority: byte(i % 256),
ts: ts,
})
ts--
}
for q.Len() > 0 {
idx := rand.IntN(q.count())
it := q.items[idx]
q.drop(it)
testQueueInvariant(t, q)
}
})
}
func testQueueInvariant(t *testing.T, q *queue) {
var previous *queueItem
for q.count() > 0 {
current := q.top()
if previous != nil {
require.True(t, previous.priority > current.priority ||
(previous.priority == current.priority &&
(previous.ts == current.ts || previous.ts < current.ts)))
}
previous = current
q.pop()
}
require.Equal(t, 0, q.count())
}

View file

@ -6,7 +6,6 @@ import (
"errors"
"math"
"sync"
"sync/atomic"
"time"
)
@ -17,6 +16,8 @@ const (
var ErrSchedulerClosed = errors.New("mClock scheduler is closed")
type Release func()
type mQueueItem interface {
ts() float64
setIndex(idx int)
@ -27,9 +28,8 @@ type mQueue struct {
}
type request struct {
tag string
seqNumber uint64
ts float64
tag string
ts float64
reservation float64
limit float64
@ -63,7 +63,6 @@ type MClockQueue struct {
tagInfo map[string]TagInfo
mtx sync.Mutex
index atomic.Uint64
previous map[string]*request
inProgress uint64
lastSchedule float64
@ -105,7 +104,6 @@ func NewMClockQueue(limit uint64, tagInfo map[string]TagInfo, idleTimeout float6
}
}
result.previous = previous
result.index.Store(1) // 0 for dummy previous
return result
}
@ -181,7 +179,6 @@ func (q *MClockQueue) pushRequest(tag string) (*request, Release, error) {
tag: tag,
ts: now,
shares: max(prev.shares+1.0/tagInfo.shares, now),
seqNumber: q.index.Add(1),
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
@ -239,29 +236,38 @@ func (q *MClockQueue) scheduleRequest(lockTaken bool) {
}
if q.inProgress >= q.limit {
// next request will be scheduled by requestCompleted()
return
}
now := q.clock.now()
for q.inProgress < q.limit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r)
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
q.scheduleByReservation(now)
if q.inProgress >= q.limit {
return
}
q.scheduleByLimitAndWeight(now)
if q.inProgress >= q.limit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
return
}
q.setNextScheduleTimer(now)
}
func (q *MClockQueue) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64
if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts()
}
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts()
}
if q.lastSchedule < now && q.lastSchedule > nextTs {
q.clock.runAt(nextTs, func() {
q.scheduleRequest(false)
})
q.lastSchedule = nextTs
}
}
func (q *MClockQueue) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
@ -304,24 +310,22 @@ func (q *MClockQueue) scheduleRequest(lockTaken bool) {
q.inProgress++
close(next.r.scheduled)
}
}
if q.inProgress >= q.limit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
return
}
func (q *MClockQueue) scheduleByReservation(now float64) {
for q.inProgress < q.limit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r)
var nextTs float64 = math.MaxFloat64
if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts()
}
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts()
}
select {
case <-next.r.canceled:
continue
default:
}
if q.lastSchedule < now && q.lastSchedule > nextTs {
q.clock.runAt(nextTs, func() {
q.scheduleRequest(false)
})
q.lastSchedule = nextTs
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}

View file

@ -84,9 +84,7 @@ func (n *noopClock) now() float64 {
return n.v
}
func (n *noopClock) runAt(ts float64, f func()) {
return
}
func (n *noopClock) runAt(ts float64, f func()) {}
func (n *noopClock) close() {}