mclock: Fix timer-based scheduling #12

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-qos:fix/mclock_timer_scheduling into master 2025-03-21 13:41:31 +00:00
4 changed files with 69 additions and 34 deletions

View file

@ -1,6 +1,7 @@
package scheduling package scheduling
import ( import (
"math"
"sync" "sync"
"time" "time"
) )
@ -36,10 +37,7 @@ func (c *systemClock) now() float64 {
} }
func (c *systemClock) runAt(ts float64, f func()) { func (c *systemClock) runAt(ts float64, f func()) {
select { c.schedule <- scheduleInfo{ts: ts, f: f}
case c.schedule <- scheduleInfo{ts: ts, f: f}:
default: // timer fired, scheduleRequest will call runAt again
}
} }
func (c *systemClock) close() { func (c *systemClock) close() {
@ -53,18 +51,30 @@ func (c *systemClock) start() {
defer c.wg.Done() defer c.wg.Done()
t := time.NewTimer(0) t := time.NewTimer(0)
<-t.C <-t.C
var f func() currentTs := math.MaxFloat64
var currentTask func()
for { for {
select { select {
case <-t.C: case <-t.C:
if f != nil { if currentTask != nil {
c.wg.Add(1)
f := currentTask
go func() {
Review

Run f() on separate goroutine to not to stuck on channel push.

Run f() on separate goroutine to not to stuck on channel push.
defer c.wg.Done()
f() f()
f = nil }()
currentTask = nil
} }
currentTs = math.MaxFloat64
case s, ok := <-c.schedule: case s, ok := <-c.schedule:
if !ok { if !ok {
return return
} }
if s.ts >= currentTs {
// current timer will fire earlier
// so next scheduleRequest will push new schedule event
continue
aarifullin marked this conversation as resolved
Review

But what does happen to the scheduled task? It's gone? Will it be rescheduled?

But what does happen to the scheduled task? It's gone? Will it be rescheduled?
Review

current timer will fire and call scheduleRequest, and scheduleRequest will set timer again on s.ts

current timer will fire and call `scheduleRequest`, and `scheduleRequest` will set timer again on `s.ts`
}
var d time.Duration var d time.Duration
now := c.now() now := c.now()
if now < s.ts { if now < s.ts {
@ -77,7 +87,8 @@ func (c *systemClock) start() {
} }
} }
t.Reset(d) t.Reset(d)
f = s.f currentTask = s.f
currentTs = s.ts
} }
} }
}() }()

View file

@ -64,7 +64,6 @@ type MClock struct {
mtx sync.Mutex mtx sync.Mutex
previous map[string]*request previous map[string]*request
inProgress uint64 inProgress uint64
timeBasedScheduleTs float64
reservationQueue *queue reservationQueue *queue
limitQueue *queue limitQueue *queue
sharesQueue *queue sharesQueue *queue
@ -96,7 +95,6 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
limitQueue: &queue{}, limitQueue: &queue{},
sharesQueue: &queue{}, sharesQueue: &queue{},
readyQueue: &queue{}, readyQueue: &queue{},
timeBasedScheduleTs: math.MaxFloat64,
} }
previous := make(map[string]*request) previous := make(map[string]*request)
@ -139,15 +137,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e
// No new requests for scheduling will be accepted after the closing. // No new requests for scheduling will be accepted after the closing.
func (q *MClock) Close() { func (q *MClock) Close() {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock()
q.closed = true q.closed = true
q.clock.close()
for q.limitQueue.Len() > 0 { for q.limitQueue.Len() > 0 {
item := heap.Pop(q.limitQueue).(*limitMQueueItem) item := heap.Pop(q.limitQueue).(*limitMQueueItem)
close(item.r.canceled) close(item.r.canceled)
q.removeFromQueues(item.r) q.removeFromQueues(item.r)
} }
q.mtx.Unlock()
q.clock.close()
} }
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
@ -263,6 +261,10 @@ func (q *MClock) scheduleRequest() {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock() defer q.mtx.Unlock()
if q.closed {
return
}
q.scheduleRequestUnsafe() q.scheduleRequestUnsafe()
} }
@ -284,23 +286,23 @@ func (q *MClock) scheduleRequestUnsafe() {
func (q *MClock) setNextScheduleTimer(now float64) { func (q *MClock) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64 nextTs := math.MaxFloat64
var hasNext bool
if q.reservationQueue.Len() > 0 { if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts() nextTs = q.reservationQueue.items[0].ts()
hasNext = true
} }
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts() nextTs = q.limitQueue.items[0].ts()
hasNext = true
} }
if nextTs <= now { if nextTs <= now {
// should not happen as we always compare .ts() <= now // should not happen as we always compare .ts() <= now
return return
} }
if !hasNext {
if q.timeBasedScheduleTs > nextTs { return
q.clock.runAt(nextTs, func() {
q.scheduleRequest()
})
q.timeBasedScheduleTs = nextTs
} }
q.clock.runAt(nextTs, q.scheduleRequest)
} }
func (q *MClock) scheduleByLimitAndWeight(now float64) { func (q *MClock) scheduleByLimitAndWeight(now float64) {

View file

@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
close(checked) close(checked)
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestMClockLowLimit(t *testing.T) {
t.Parallel()
limit := 2.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(5)
eg.Go(func() error {
for range 3 {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
release()
}
return nil
})
require.NoError(t, eg.Wait())
}

View file

@ -11,8 +11,8 @@ const (
ioTagHeader = "x-frostfs-io-tag" ioTagHeader = "x-frostfs-io-tag"
) )
// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. // NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor { func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
} }