diff --git a/scheduling/clock.go b/scheduling/clock.go index 9fe66bd..35eeacd 100644 --- a/scheduling/clock.go +++ b/scheduling/clock.go @@ -1,6 +1,7 @@ package scheduling import ( + "math" "sync" "time" ) @@ -25,7 +26,7 @@ type systemClock struct { func newSystemClock() *systemClock { c := &systemClock{ since: time.Now(), - schedule: make(chan scheduleInfo), + schedule: make(chan scheduleInfo, 1), } c.start() return c @@ -36,10 +37,7 @@ func (c *systemClock) now() float64 { } func (c *systemClock) runAt(ts float64, f func()) { - select { - case c.schedule <- scheduleInfo{ts: ts, f: f}: - default: // timer fired, scheduleRequest will call runAt again - } + c.schedule <- scheduleInfo{ts: ts, f: f} } func (c *systemClock) close() { @@ -53,6 +51,7 @@ func (c *systemClock) start() { defer c.wg.Done() t := time.NewTimer(0) <-t.C + currentTs := math.MaxFloat64 var f func() for { select { @@ -61,10 +60,16 @@ func (c *systemClock) start() { f() f = nil } + currentTs = math.MaxFloat64 case s, ok := <-c.schedule: if !ok { return } + if s.ts >= currentTs { + // current timer will fire earlier + // so next scheduleRequest will push new schedule event + continue + } var d time.Duration now := c.now() if now < s.ts { @@ -78,6 +83,7 @@ func (c *systemClock) start() { } t.Reset(d) f = s.f + currentTs = s.ts } } }() diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 82037d6..aa5c794 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -61,15 +61,14 @@ type MClock struct { idleTimeout float64 tagInfo map[string]TagInfo - mtx sync.Mutex - previous map[string]*request - inProgress uint64 - timeBasedScheduleTs float64 - reservationQueue *queue - limitQueue *queue - sharesQueue *queue - readyQueue *queue - closed bool + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool } // NewMClock creates new MClock scheduler instance with @@ -92,11 +91,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo idleTimeout: idleTimeout.Seconds(), tagInfo: tagInfo, - reservationQueue: &queue{}, - limitQueue: &queue{}, - sharesQueue: &queue{}, - readyQueue: &queue{}, - timeBasedScheduleTs: math.MaxFloat64, + reservationQueue: &queue{}, + limitQueue: &queue{}, + sharesQueue: &queue{}, + readyQueue: &queue{}, } previous := make(map[string]*request) @@ -294,13 +292,10 @@ func (q *MClock) setNextScheduleTimer(now float64) { // should not happen as we always compare .ts() <= now return } - - if q.timeBasedScheduleTs > nextTs { - q.clock.runAt(nextTs, func() { - q.scheduleRequest() - }) - q.timeBasedScheduleTs = nextTs + if nextTs == math.MaxFloat64 { + return } + q.clock.runAt(nextTs, q.scheduleRequest) } func (q *MClock) scheduleByLimitAndWeight(now float64) { diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 3aa261f..f9da670 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) { close(checked) require.NoError(t, eg.Wait()) } + +func TestMClockLowLimit(t *testing.T) { + t.Parallel() + limit := 2.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(5) + eg.Go(func() error { + for range 3 { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + release() + } + return nil + }) + require.NoError(t, eg.Wait()) +}