[#1] mclock: Fix time based scheduling

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-24 12:18:16 +03:00
parent e18d1a7c45
commit 54b4bf7cc1
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 223 additions and 182 deletions

View file

@ -74,15 +74,15 @@ type MClock struct {
idleTimeout float64
tagInfo map[string]TagInfo
mtx sync.Mutex
previous map[string]*request
inProgress uint64
lastSchedule float64
reservationQueue *queue
limitQueue *queue
sharesQueue *queue
readyQueue *queue
closed bool
mtx sync.Mutex
previous map[string]*request
inProgress uint64
timeBasedScheduleTs float64
reservationQueue *queue
limitQueue *queue
sharesQueue *queue
readyQueue *queue
closed bool
}
// NewMClock creates new MClock scheduler instance with
@ -105,10 +105,11 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
idleTimeout: idleTimeout,
tagInfo: tagInfo,
reservationQueue: &queue{},
limitQueue: &queue{},
sharesQueue: &queue{},
readyQueue: &queue{},
reservationQueue: &queue{},
limitQueue: &queue{},
sharesQueue: &queue{},
readyQueue: &queue{},
timeBasedScheduleTs: math.MaxFloat64,
}
previous := make(map[string]*request)
@ -304,12 +305,16 @@ func (q *MClock) setNextScheduleTimer(now float64) {
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts()
}
if nextTs <= now {
// should not happen as we always compare .ts() <= now
return
}
if q.lastSchedule < now && q.lastSchedule > nextTs {
if q.timeBasedScheduleTs > nextTs {
q.clock.runAt(nextTs, func() {
q.scheduleRequest(false)
})
q.lastSchedule = nextTs
q.timeBasedScheduleTs = nextTs
}
}