[#9999] quota: Simplify queue creation

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-14 13:58:08 +03:00
parent a53eb16a5a
commit e70c195c17
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 57 additions and 56 deletions

View file

@ -48,28 +48,12 @@ func BenchmarkMClock(b *testing.B) {
for _, limit := range limits {
for _, reservation := range reservations {
for _, tags := range tagsCount {
tagInfos := make(map[string]tagInfo)
previous := make(map[string]*request)
tagInfos := make(map[string]TagInfo)
for tag := 0; tag < tags; tag++ {
tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = tagInfo{shares: 50, limit: limit, reservation: reservation}
previous["tag"+strconv.FormatInt(int64(tag), 10)] = &request{}
tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{shares: 50, limit: limit, reservation: reservation}
}
mClockQ := &mClockQueue{
limit: 100_000,
clock: newSystemClock(),
tagInfo: tagInfos,
previous: previous,
reservationQueue: &mQueue{
items: make([]mQueueItem, 0),
},
limitQueue: &mQueue{
items: make([]mQueueItem, 0),
},
sharesQueue: &mQueue{
items: make([]mQueueItem, 0),
},
}
mClockQ := NewMClockQueue(100_000, tagInfos, (10 * time.Second).Seconds())
resStr := "no"
if reservation != nil {

View file

@ -41,7 +41,7 @@ type request struct {
canceled chan struct{}
}
type tagInfo struct {
type TagInfo struct {
reservation *float64
limit *float64
shares float64
@ -52,14 +52,14 @@ type clock interface {
runAt(ts float64, f func())
}
type mClockQueue struct {
type MClockQueue struct {
limit uint64
clock clock
idleTimeout float64
index atomic.Uint64
tagInfo map[string]tagInfo
tagInfo map[string]TagInfo
mtx sync.Mutex
index atomic.Uint64
previous map[string]*request
inProgress uint64
lastSchedule float64
@ -69,7 +69,43 @@ type mClockQueue struct {
readyQueue *mQueue
}
func (q *mClockQueue) RequestArrival(ctx context.Context, tag string) (Release, error) {
func NewMClockQueue(limit uint64, tagInfo map[string]TagInfo, idleTimeout float64) *MClockQueue {
result := &MClockQueue{
limit: limit,
clock: newSystemClock(),
idleTimeout: idleTimeout,
tagInfo: tagInfo,
reservationQueue: &mQueue{
items: make([]mQueueItem, 0),
},
limitQueue: &mQueue{
items: make([]mQueueItem, 0),
},
sharesQueue: &mQueue{
items: make([]mQueueItem, 0),
},
readyQueue: &mQueue{
items: make([]mQueueItem, 0),
},
}
previous := make(map[string]*request)
for tag := range tagInfo {
previous[tag] = &request{
tag: tag,
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
}
}
result.previous = previous
result.index.Store(1) // 0 for dummy previous
return result
}
func (q *MClockQueue) RequestArrival(ctx context.Context, tag string) (Release, error) {
req, release := q.pushRequest(tag)
select {
case <-ctx.Done():
@ -80,7 +116,7 @@ func (q *mClockQueue) RequestArrival(ctx context.Context, tag string) (Release,
}
}
func (q *mClockQueue) dropRequest(req *request) {
func (q *MClockQueue) dropRequest(req *request) {
q.mtx.Lock()
defer q.mtx.Unlock()
@ -96,7 +132,7 @@ func (q *mClockQueue) dropRequest(req *request) {
q.removeFromQueues(req)
}
func (q *mClockQueue) pushRequest(tag string) (*request, Release) {
func (q *MClockQueue) pushRequest(tag string) (*request, Release) {
q.mtx.Lock()
defer q.mtx.Unlock()
@ -149,7 +185,7 @@ func (q *mClockQueue) pushRequest(tag string) (*request, Release) {
return r, q.requestCompleted
}
func (q *mClockQueue) adjustTags(now float64) {
func (q *MClockQueue) adjustTags(now float64) {
if q.sharesQueue.Len() == 0 {
return
}
@ -173,7 +209,7 @@ func (q *mClockQueue) adjustTags(now float64) {
}
}
func (q *mClockQueue) scheduleRequest(lockTaken bool) {
func (q *MClockQueue) scheduleRequest(lockTaken bool) {
if !lockTaken {
q.mtx.Lock()
defer q.mtx.Unlock()
@ -266,7 +302,7 @@ func (q *mClockQueue) scheduleRequest(lockTaken bool) {
}
}
func (q *mClockQueue) removeFromQueues(r *request) {
func (q *MClockQueue) removeFromQueues(r *request) {
if r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, r.limitIdx)
}
@ -281,7 +317,7 @@ func (q *mClockQueue) removeFromQueues(r *request) {
}
}
func (q *mClockQueue) requestCompleted() {
func (q *MClockQueue) requestCompleted() {
q.mtx.Lock()
defer q.mtx.Unlock()

View file

@ -16,30 +16,11 @@ import (
func TestMClockSimulation(t *testing.T) {
t.Skip("use for simulation")
const maxIter = 1000
q := &mClockQueue{
limit: 1,
clock: newSystemClock(),
tagInfo: map[string]tagInfo{
"class1": {shares: 10},
"class2": {shares: 5},
},
previous: map[string]*request{
"class1": {reservation: 0.0, limit: 0.0, shares: 0.0},
"class2": {reservation: 0.0, limit: 0.0, shares: 0.0},
},
reservationQueue: &mQueue{
items: make([]mQueueItem, 0),
},
limitQueue: &mQueue{
items: make([]mQueueItem, 0),
},
sharesQueue: &mQueue{
items: make([]mQueueItem, 0),
},
readyQueue: &mQueue{
items: make([]mQueueItem, 0),
},
}
const sleepInterval = 10 * time.Millisecond
q := NewMClockQueue(1, map[string]TagInfo{
"class1": {shares: 10},
"class2": {shares: 5},
}, (10 * sleepInterval).Seconds())
eg, ctx := errgroup.WithContext(context.Background())
var seqNumber atomic.Int64
for i := 0; i < maxIter; i++ {
@ -53,10 +34,10 @@ func TestMClockSimulation(t *testing.T) {
}
defer release()
fmt.Printf("request with seq number %d and tag %s scheduled\n", sn, tag)
time.Sleep(10 * time.Millisecond)
time.Sleep(sleepInterval)
return nil
})
}
require.NoError(t, eg.Wait())
t.Fail()
t.Fail() // to print
}