forked from TrueCloudLab/frostfs-node
[#9999] quota: Add limiter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
d77a218f7c
commit
17719116fa
2 changed files with 390 additions and 0 deletions
290
pkg/core/quota/limiter.go
Normal file
290
pkg/core/quota/limiter.go
Normal file
|
@ -0,0 +1,290 @@
|
|||
package quota
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Quota struct {
|
||||
Read int64
|
||||
Write int64
|
||||
}
|
||||
|
||||
type Priority struct {
|
||||
Class string
|
||||
Value byte
|
||||
}
|
||||
|
||||
type Release func()
|
||||
|
||||
type limit struct {
|
||||
read, write int64
|
||||
maxRead, maxWrite int64
|
||||
}
|
||||
|
||||
type queueItem struct {
|
||||
priority byte
|
||||
ts uint64
|
||||
index int
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
l *limit
|
||||
w int32
|
||||
items []*queueItem
|
||||
}
|
||||
|
||||
type QuotaLimiter struct {
|
||||
l *limit
|
||||
classes map[string]*queue
|
||||
queues []*queue
|
||||
nextQueue *queue
|
||||
cond *sync.Cond
|
||||
ts uint64
|
||||
}
|
||||
|
||||
func (ql *QuotaLimiter) Acquire(ctx context.Context, p Priority, q Quota) (Release, error) {
|
||||
queue, ok := ql.classes[p.Class]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown class '%s'", p.Class)
|
||||
}
|
||||
|
||||
if ql.l.maxRead > 0 && q.Read > ql.l.maxRead {
|
||||
return nil, fmt.Errorf("read quota %d exceeds total limit %d", q.Read, ql.l.maxRead)
|
||||
}
|
||||
|
||||
if ql.l.maxWrite > 0 && q.Write > ql.l.maxWrite {
|
||||
return nil, fmt.Errorf("write quota %d exceeds total limit %d", q.Write, ql.l.maxWrite)
|
||||
}
|
||||
|
||||
if queue.l.maxRead > 0 && q.Read > queue.l.maxRead {
|
||||
return nil, fmt.Errorf("read quota %d exceeds queue limit %d", q.Read, queue.l.maxRead)
|
||||
}
|
||||
|
||||
if queue.l.maxWrite > 0 && q.Write > queue.l.maxWrite {
|
||||
return nil, fmt.Errorf("write quota %d exceeds queue limit %d", q.Write, queue.l.maxWrite)
|
||||
}
|
||||
|
||||
ql.cond.L.Lock()
|
||||
defer ql.cond.L.Unlock()
|
||||
|
||||
allow := ql.nextQueue == nil && // no scheduled queue
|
||||
hasQuota(q, queue.l) && // queue limit
|
||||
hasQuota(q, ql.l) // global limit
|
||||
|
||||
if allow {
|
||||
applyQuota(q, queue.l)
|
||||
applyQuota(q, ql.l)
|
||||
return func() { ql.release(p, q) }, nil
|
||||
}
|
||||
|
||||
stop := context.AfterFunc(ctx, func() {
|
||||
ql.cond.Broadcast()
|
||||
})
|
||||
defer stop()
|
||||
|
||||
ql.ts++
|
||||
qi := &queueItem{
|
||||
priority: p.Value,
|
||||
ts: ql.ts,
|
||||
}
|
||||
|
||||
queue.push(qi)
|
||||
if queue.count() == 1 {
|
||||
ql.resetNextQueue()
|
||||
}
|
||||
|
||||
var hasGlobalQuota, hasQueueQuota, isNextItem bool
|
||||
for !allow {
|
||||
ql.cond.Wait()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
queue.drop(qi)
|
||||
if queue.count() == 0 {
|
||||
ql.resetNextQueue()
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
hasGlobalQuota = hasQuota(q, ql.l)
|
||||
hasQueueQuota = hasQuota(q, queue.l)
|
||||
isNextItem = ql.nextQueue == queue && queue.top() == qi
|
||||
|
||||
if hasGlobalQuota && !hasQueueQuota && isNextItem {
|
||||
ql.changeNextQueue()
|
||||
}
|
||||
allow = hasGlobalQuota && hasQueueQuota && isNextItem
|
||||
}
|
||||
|
||||
applyQuota(q, queue.l)
|
||||
applyQuota(q, ql.l)
|
||||
queue.pop()
|
||||
ql.resetNextQueue()
|
||||
return func() { ql.release(p, q) }, nil
|
||||
}
|
||||
|
||||
func (ql *QuotaLimiter) release(p Priority, q Quota) {
|
||||
queue, ok := ql.classes[p.Class]
|
||||
if !ok {
|
||||
panic("unknown class " + p.Class)
|
||||
}
|
||||
|
||||
ql.cond.L.Lock()
|
||||
defer ql.cond.L.Unlock()
|
||||
|
||||
releaseQuota(q, queue.l)
|
||||
releaseQuota(q, ql.l)
|
||||
|
||||
ql.cond.Broadcast()
|
||||
}
|
||||
|
||||
func (ql *QuotaLimiter) resetNextQueue() {
|
||||
var nonEmptyQueues []*queue
|
||||
var totalWeight int64
|
||||
for _, q := range ql.queues {
|
||||
if q.count() > 0 {
|
||||
nonEmptyQueues = append(nonEmptyQueues, q)
|
||||
totalWeight += int64(q.weight())
|
||||
}
|
||||
}
|
||||
if len(nonEmptyQueues) == 0 {
|
||||
ql.nextQueue = nil
|
||||
return
|
||||
}
|
||||
ql.selectNextQueue(nonEmptyQueues, totalWeight)
|
||||
}
|
||||
|
||||
func (ql *QuotaLimiter) changeNextQueue() {
|
||||
var nonEmptyQueues []*queue
|
||||
var totalWeight int64
|
||||
for _, q := range ql.queues {
|
||||
if q.count() > 0 && q != ql.nextQueue {
|
||||
nonEmptyQueues = append(nonEmptyQueues, q)
|
||||
totalWeight += int64(q.weight())
|
||||
}
|
||||
}
|
||||
if len(nonEmptyQueues) == 0 {
|
||||
return
|
||||
}
|
||||
ql.selectNextQueue(nonEmptyQueues, totalWeight)
|
||||
}
|
||||
|
||||
func (ql *QuotaLimiter) selectNextQueue(queues []*queue, totalWeight int64) {
|
||||
if totalWeight == 0 {
|
||||
ql.nextQueue = queues[rand.IntN(len(queues))]
|
||||
return
|
||||
}
|
||||
weight := rand.Int64N(totalWeight)
|
||||
var low, up int64
|
||||
for _, q := range queues {
|
||||
low = up
|
||||
up += int64(q.weight())
|
||||
if weight >= low && weight < up {
|
||||
ql.nextQueue = q
|
||||
return
|
||||
}
|
||||
}
|
||||
panic("undefined next queue")
|
||||
}
|
||||
|
||||
func hasQuota(q Quota, l *limit) bool {
|
||||
if q.Read > 0 && l.maxRead > 0 && q.Read+l.read > l.maxRead {
|
||||
return false
|
||||
}
|
||||
if q.Write > 0 && l.maxWrite > 0 && q.Write+l.write > l.write {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func applyQuota(q Quota, l *limit) {
|
||||
if q.Read > 0 && l.maxRead > 0 {
|
||||
l.read += q.Read
|
||||
}
|
||||
if q.Write > 0 && l.maxWrite > 0 {
|
||||
l.write += q.Write
|
||||
}
|
||||
}
|
||||
|
||||
func releaseQuota(q Quota, l *limit) {
|
||||
if q.Read > 0 && l.maxRead > 0 {
|
||||
l.read -= q.Read
|
||||
if l.read < 0 {
|
||||
panic("invalid read limit after release")
|
||||
}
|
||||
}
|
||||
if q.Write > 0 && l.maxWrite > 0 {
|
||||
l.write -= q.Write
|
||||
if l.write < 0 {
|
||||
panic("invalid write limit after release")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) push(qi *queueItem) {
|
||||
heap.Push(q, qi)
|
||||
}
|
||||
|
||||
func (q *queue) pop() {
|
||||
heap.Pop(q)
|
||||
}
|
||||
|
||||
func (q *queue) drop(qi *queueItem) {
|
||||
heap.Remove(q, qi.index)
|
||||
}
|
||||
|
||||
func (q *queue) top() *queueItem {
|
||||
if len(q.items) > 0 {
|
||||
return q.items[0]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) count() int {
|
||||
return len(q.items)
|
||||
}
|
||||
|
||||
func (q *queue) weight() int32 {
|
||||
return q.w
|
||||
}
|
||||
|
||||
// Len implements heap.Interface.
|
||||
func (q *queue) Len() int {
|
||||
return q.count()
|
||||
}
|
||||
|
||||
// Less implements heap.Interface.
|
||||
func (q *queue) Less(i int, j int) bool {
|
||||
if q.items[i].priority == q.items[j].priority {
|
||||
return q.items[i].ts < q.items[j].ts
|
||||
}
|
||||
return q.items[i].priority > q.items[j].priority
|
||||
}
|
||||
|
||||
// Pop implements heap.Interface.
|
||||
func (q *queue) Pop() any {
|
||||
n := len(q.items)
|
||||
item := q.items[n-1]
|
||||
q.items[n-1] = nil
|
||||
q.items = q.items[0 : n-1]
|
||||
item.index = -1
|
||||
return item
|
||||
}
|
||||
|
||||
// Push implements heap.Interface.
|
||||
func (q *queue) Push(x any) {
|
||||
it := x.(*queueItem)
|
||||
it.index = q.Len()
|
||||
q.items = append(q.items, it)
|
||||
}
|
||||
|
||||
// Swap implements heap.Interface.
|
||||
func (q *queue) Swap(i int, j int) {
|
||||
q.items[i], q.items[j] = q.items[j], q.items[i]
|
||||
q.items[i].index = i
|
||||
q.items[j].index = j
|
||||
}
|
100
pkg/core/quota/limiter_test.go
Normal file
100
pkg/core/quota/limiter_test.go
Normal file
|
@ -0,0 +1,100 @@
|
|||
package quota
|
||||
|
||||
import (
|
||||
"math/rand/v2"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
t.Run("different priority", func(t *testing.T) {
|
||||
q := &queue{}
|
||||
const count = 12345
|
||||
for i := range count {
|
||||
priority := i % 256
|
||||
q.push(&queueItem{
|
||||
priority: byte(priority),
|
||||
})
|
||||
}
|
||||
testQueueInvariant(t, q)
|
||||
})
|
||||
t.Run("same priority, different ts, inc", func(t *testing.T) {
|
||||
q := &queue{}
|
||||
var ts uint64
|
||||
const count = 10000
|
||||
for range count {
|
||||
q.push(&queueItem{
|
||||
priority: 100,
|
||||
ts: ts,
|
||||
})
|
||||
ts++
|
||||
}
|
||||
testQueueInvariant(t, q)
|
||||
})
|
||||
t.Run("same priority, different ts, dec", func(t *testing.T) {
|
||||
q := &queue{}
|
||||
|
||||
const count = 10000
|
||||
ts := uint64(count)
|
||||
for range count {
|
||||
q.push(&queueItem{
|
||||
priority: 100,
|
||||
ts: ts,
|
||||
})
|
||||
ts--
|
||||
}
|
||||
testQueueInvariant(t, q)
|
||||
})
|
||||
t.Run("drop, inc", func(t *testing.T) {
|
||||
q := &queue{}
|
||||
var ts uint64
|
||||
const count = 12345
|
||||
for i := range count {
|
||||
q.push(&queueItem{
|
||||
priority: byte(i % 256),
|
||||
ts: ts,
|
||||
})
|
||||
ts++
|
||||
}
|
||||
for q.Len() > 0 {
|
||||
idx := rand.IntN(q.count())
|
||||
it := q.items[idx]
|
||||
q.drop(it)
|
||||
testQueueInvariant(t, q)
|
||||
}
|
||||
})
|
||||
t.Run("drop, dec", func(t *testing.T) {
|
||||
q := &queue{}
|
||||
const count = 12345
|
||||
ts := uint64(count)
|
||||
for i := range count {
|
||||
q.push(&queueItem{
|
||||
priority: byte(i % 256),
|
||||
ts: ts,
|
||||
})
|
||||
ts--
|
||||
}
|
||||
for q.Len() > 0 {
|
||||
idx := rand.IntN(q.count())
|
||||
it := q.items[idx]
|
||||
q.drop(it)
|
||||
testQueueInvariant(t, q)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func testQueueInvariant(t *testing.T, q *queue) {
|
||||
var previous *queueItem
|
||||
for q.count() > 0 {
|
||||
current := q.top()
|
||||
if previous != nil {
|
||||
require.True(t, previous.priority > current.priority ||
|
||||
(previous.priority == current.priority &&
|
||||
(previous.ts == current.ts || previous.ts < current.ts)))
|
||||
}
|
||||
previous = current
|
||||
q.pop()
|
||||
}
|
||||
require.Equal(t, 0, q.count())
|
||||
}
|
Loading…
Reference in a new issue