From f546741c3aaa981c96bc7bee76f8bc38f29e499a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 19 Mar 2025 15:51:38 +0300 Subject: [PATCH] [#12] mclock: Fix timer-based scheduling Let's assume that there are two requests in the queue with execution time t1 and t2. The timer is set to t1. The timer is triggered, schedules the t1 request, calculates the time for the next timer t2 to be triggered. But it doesn't schedules timer to this time because of the `q.timeBasedScheduleTs > nextTs` check. Signed-off-by: Dmitrii Stepanov --- scheduling/clock.go | 16 +++++++++++----- scheduling/mclock.go | 35 +++++++++++++++-------------------- scheduling/mclock_test.go | 22 ++++++++++++++++++++++ 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/scheduling/clock.go b/scheduling/clock.go index 9fe66bd..35eeacd 100644 --- a/scheduling/clock.go +++ b/scheduling/clock.go @@ -1,6 +1,7 @@ package scheduling import ( + "math" "sync" "time" ) @@ -25,7 +26,7 @@ type systemClock struct { func newSystemClock() *systemClock { c := &systemClock{ since: time.Now(), - schedule: make(chan scheduleInfo), + schedule: make(chan scheduleInfo, 1), } c.start() return c @@ -36,10 +37,7 @@ func (c *systemClock) now() float64 { } func (c *systemClock) runAt(ts float64, f func()) { - select { - case c.schedule <- scheduleInfo{ts: ts, f: f}: - default: // timer fired, scheduleRequest will call runAt again - } + c.schedule <- scheduleInfo{ts: ts, f: f} } func (c *systemClock) close() { @@ -53,6 +51,7 @@ func (c *systemClock) start() { defer c.wg.Done() t := time.NewTimer(0) <-t.C + currentTs := math.MaxFloat64 var f func() for { select { @@ -61,10 +60,16 @@ func (c *systemClock) start() { f() f = nil } + currentTs = math.MaxFloat64 case s, ok := <-c.schedule: if !ok { return } + if s.ts >= currentTs { + // current timer will fire earlier + // so next scheduleRequest will push new schedule event + continue + } var d time.Duration now := c.now() if now < s.ts { @@ -78,6 +83,7 @@ func (c *systemClock) start() { } t.Reset(d) f = s.f + currentTs = s.ts } } }() diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 82037d6..aa5c794 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -61,15 +61,14 @@ type MClock struct { idleTimeout float64 tagInfo map[string]TagInfo - mtx sync.Mutex - previous map[string]*request - inProgress uint64 - timeBasedScheduleTs float64 - reservationQueue *queue - limitQueue *queue - sharesQueue *queue - readyQueue *queue - closed bool + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool } // NewMClock creates new MClock scheduler instance with @@ -92,11 +91,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo idleTimeout: idleTimeout.Seconds(), tagInfo: tagInfo, - reservationQueue: &queue{}, - limitQueue: &queue{}, - sharesQueue: &queue{}, - readyQueue: &queue{}, - timeBasedScheduleTs: math.MaxFloat64, + reservationQueue: &queue{}, + limitQueue: &queue{}, + sharesQueue: &queue{}, + readyQueue: &queue{}, } previous := make(map[string]*request) @@ -294,13 +292,10 @@ func (q *MClock) setNextScheduleTimer(now float64) { // should not happen as we always compare .ts() <= now return } - - if q.timeBasedScheduleTs > nextTs { - q.clock.runAt(nextTs, func() { - q.scheduleRequest() - }) - q.timeBasedScheduleTs = nextTs + if nextTs == math.MaxFloat64 { + return } + q.clock.runAt(nextTs, q.scheduleRequest) } func (q *MClock) scheduleByLimitAndWeight(now float64) { diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 3aa261f..f9da670 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) { close(checked) require.NoError(t, eg.Wait()) } + +func TestMClockLowLimit(t *testing.T) { + t.Parallel() + limit := 2.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(5) + eg.Go(func() error { + for range 3 { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + release() + } + return nil + }) + require.NoError(t, eg.Wait()) +}