[#12] mclock: Fix timer-based scheduling
All checks were successful
DCO action / DCO (pull_request) Successful in 21s
Vulncheck / Vulncheck (pull_request) Successful in 39s
Tests and linters / Lint (pull_request) Successful in 1m14s
Tests and linters / Run gofumpt (pull_request) Successful in 1m8s
Tests and linters / gopls check (pull_request) Successful in 1m10s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m16s
Tests and linters / Tests (pull_request) Successful in 1m20s
Tests and linters / Staticcheck (pull_request) Successful in 1m19s
Tests and linters / Tests with -race (pull_request) Successful in 1m35s

Let's assume that there are two requests in the queue with execution time t1 and t2.
The timer is set to t1. The timer is triggered, schedules the t1 request,
calculates the time for the next timer t2 to be triggered.
But it doesn't schedules timer to this time because of the
`q.timeBasedScheduleTs > nextTs` check.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-03-19 15:51:38 +03:00
parent 3e7ca94035
commit 1ebed20b9d
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 57 additions and 29 deletions

View file

@ -1,6 +1,7 @@
package scheduling package scheduling
import ( import (
"math"
"sync" "sync"
"time" "time"
) )
@ -36,10 +37,7 @@ func (c *systemClock) now() float64 {
} }
func (c *systemClock) runAt(ts float64, f func()) { func (c *systemClock) runAt(ts float64, f func()) {
select { c.schedule <- scheduleInfo{ts: ts, f: f}
case c.schedule <- scheduleInfo{ts: ts, f: f}:
default: // timer fired, scheduleRequest will call runAt again
}
} }
func (c *systemClock) close() { func (c *systemClock) close() {
@ -53,18 +51,30 @@ func (c *systemClock) start() {
defer c.wg.Done() defer c.wg.Done()
t := time.NewTimer(0) t := time.NewTimer(0)
<-t.C <-t.C
var f func() currentTs := math.MaxFloat64
var currentTask func()
for { for {
select { select {
case <-t.C: case <-t.C:
if f != nil { if currentTask != nil {
f() c.wg.Add(1)
f = nil f := currentTask
go func() {
defer c.wg.Done()
f()
}()
currentTask = nil
} }
currentTs = math.MaxFloat64
case s, ok := <-c.schedule: case s, ok := <-c.schedule:
if !ok { if !ok {
return return
} }
if s.ts >= currentTs {
// current timer will fire earlier
// so next scheduleRequest will push new schedule event
continue
}
var d time.Duration var d time.Duration
now := c.now() now := c.now()
if now < s.ts { if now < s.ts {
@ -77,7 +87,8 @@ func (c *systemClock) start() {
} }
} }
t.Reset(d) t.Reset(d)
f = s.f currentTask = s.f
currentTs = s.ts
} }
} }
}() }()

View file

@ -61,15 +61,14 @@ type MClock struct {
idleTimeout float64 idleTimeout float64
tagInfo map[string]TagInfo tagInfo map[string]TagInfo
mtx sync.Mutex mtx sync.Mutex
previous map[string]*request previous map[string]*request
inProgress uint64 inProgress uint64
timeBasedScheduleTs float64 reservationQueue *queue
reservationQueue *queue limitQueue *queue
limitQueue *queue sharesQueue *queue
sharesQueue *queue readyQueue *queue
readyQueue *queue closed bool
closed bool
} }
// NewMClock creates new MClock scheduler instance with // NewMClock creates new MClock scheduler instance with
@ -92,11 +91,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
idleTimeout: idleTimeout.Seconds(), idleTimeout: idleTimeout.Seconds(),
tagInfo: tagInfo, tagInfo: tagInfo,
reservationQueue: &queue{}, reservationQueue: &queue{},
limitQueue: &queue{}, limitQueue: &queue{},
sharesQueue: &queue{}, sharesQueue: &queue{},
readyQueue: &queue{}, readyQueue: &queue{},
timeBasedScheduleTs: math.MaxFloat64,
} }
previous := make(map[string]*request) previous := make(map[string]*request)
@ -294,13 +292,10 @@ func (q *MClock) setNextScheduleTimer(now float64) {
// should not happen as we always compare .ts() <= now // should not happen as we always compare .ts() <= now
return return
} }
if nextTs == math.MaxFloat64 {
if q.timeBasedScheduleTs > nextTs { return
q.clock.runAt(nextTs, func() {
q.scheduleRequest()
})
q.timeBasedScheduleTs = nextTs
} }
q.clock.runAt(nextTs, q.scheduleRequest)
} }
func (q *MClock) scheduleByLimitAndWeight(now float64) { func (q *MClock) scheduleByLimitAndWeight(now float64) {

View file

@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
close(checked) close(checked)
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestMClockLowLimit(t *testing.T) {
t.Parallel()
limit := 2.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(5)
eg.Go(func() error {
for range 3 {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
release()
}
return nil
})
require.NoError(t, eg.Wait())
}