[#1] mclock: Fix time based scheduling
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 32s
DCO action / DCO (pull_request) Successful in 48s
Vulncheck / Vulncheck (pull_request) Successful in 46s
Tests and linters / Staticcheck (pull_request) Successful in 50s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m14s
Tests and linters / Tests (pull_request) Successful in 1m24s
Tests and linters / Tests with -race (pull_request) Successful in 1m24s
Tests and linters / Lint (pull_request) Successful in 1m27s
Tests and linters / gopls check (pull_request) Successful in 1m53s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-24 12:18:16 +03:00
parent b21ebb5d00
commit 020829ced1
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0

View file

@ -74,15 +74,15 @@ type MClock struct {
idleTimeout float64 idleTimeout float64
tagInfo map[string]TagInfo tagInfo map[string]TagInfo
mtx sync.Mutex mtx sync.Mutex
previous map[string]*request previous map[string]*request
inProgress uint64 inProgress uint64
lastSchedule float64 timeBasedScheduleTs float64
reservationQueue *queue reservationQueue *queue
limitQueue *queue limitQueue *queue
sharesQueue *queue sharesQueue *queue
readyQueue *queue readyQueue *queue
closed bool closed bool
} }
// NewMClock creates new MClock scheduler instance with // NewMClock creates new MClock scheduler instance with
@ -117,6 +117,7 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
readyQueue: &queue{ readyQueue: &queue{
items: make([]queueItem, 0), items: make([]queueItem, 0),
}, },
timeBasedScheduleTs: math.MaxFloat64,
} }
previous := make(map[string]*request) previous := make(map[string]*request)
@ -301,10 +302,10 @@ func (q *MClock) scheduleRequest(lockTaken bool) {
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
return return
} }
q.setNextScheduleTimer(now) q.setNextScheduleTimer()
} }
func (q *MClock) setNextScheduleTimer(now float64) { func (q *MClock) setNextScheduleTimer() {
nextTs := math.MaxFloat64 nextTs := math.MaxFloat64
if q.reservationQueue.Len() > 0 { if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts() nextTs = q.reservationQueue.items[0].ts()
@ -313,11 +314,11 @@ func (q *MClock) setNextScheduleTimer(now float64) {
nextTs = q.limitQueue.items[0].ts() nextTs = q.limitQueue.items[0].ts()
} }
if q.lastSchedule < now && q.lastSchedule > nextTs { if q.timeBasedScheduleTs > nextTs {
q.clock.runAt(nextTs, func() { q.clock.runAt(nextTs, func() {
q.scheduleRequest(false) q.scheduleRequest(false)
}) })
q.lastSchedule = nextTs q.timeBasedScheduleTs = nextTs
} }
} }