diff --git a/scheduling/mclock.go b/scheduling/mclock.go index d2d75c0..de0a59f 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -74,15 +74,15 @@ type MClock struct { idleTimeout float64 tagInfo map[string]TagInfo - mtx sync.Mutex - previous map[string]*request - inProgress uint64 - lastSchedule float64 - reservationQueue *queue - limitQueue *queue - sharesQueue *queue - readyQueue *queue - closed bool + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + timeBasedScheduleTs float64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool } // NewMClock creates new MClock scheduler instance with @@ -117,6 +117,7 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo readyQueue: &queue{ items: make([]queueItem, 0), }, + timeBasedScheduleTs: math.MaxFloat64, } previous := make(map[string]*request) @@ -301,10 +302,10 @@ func (q *MClock) scheduleRequest(lockTaken bool) { if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { return } - q.setNextScheduleTimer(now) + q.setNextScheduleTimer() } -func (q *MClock) setNextScheduleTimer(now float64) { +func (q *MClock) setNextScheduleTimer() { nextTs := math.MaxFloat64 if q.reservationQueue.Len() > 0 { nextTs = q.reservationQueue.items[0].ts() @@ -313,11 +314,11 @@ func (q *MClock) setNextScheduleTimer(now float64) { nextTs = q.limitQueue.items[0].ts() } - if q.lastSchedule < now && q.lastSchedule > nextTs { + if q.timeBasedScheduleTs > nextTs { q.clock.runAt(nextTs, func() { q.scheduleRequest(false) }) - q.lastSchedule = nextTs + q.timeBasedScheduleTs = nextTs } }