diff --git a/scheduling/clock.go b/scheduling/clock.go index 9fe66bd..6fa3d84 100644 --- a/scheduling/clock.go +++ b/scheduling/clock.go @@ -1,6 +1,7 @@ package scheduling import ( + "math" "sync" "time" ) @@ -36,10 +37,7 @@ func (c *systemClock) now() float64 { } func (c *systemClock) runAt(ts float64, f func()) { - select { - case c.schedule <- scheduleInfo{ts: ts, f: f}: - default: // timer fired, scheduleRequest will call runAt again - } + c.schedule <- scheduleInfo{ts: ts, f: f} } func (c *systemClock) close() { @@ -53,18 +51,30 @@ func (c *systemClock) start() { defer c.wg.Done() t := time.NewTimer(0) <-t.C - var f func() + currentTs := math.MaxFloat64 + var currentTask func() for { select { case <-t.C: - if f != nil { - f() - f = nil + if currentTask != nil { + c.wg.Add(1) + f := currentTask + go func() { + defer c.wg.Done() + f() + }() + currentTask = nil } + currentTs = math.MaxFloat64 case s, ok := <-c.schedule: if !ok { return } + if s.ts >= currentTs { + // current timer will fire earlier + // so next scheduleRequest will push new schedule event + continue + } var d time.Duration now := c.now() if now < s.ts { @@ -77,7 +87,8 @@ func (c *systemClock) start() { } } t.Reset(d) - f = s.f + currentTask = s.f + currentTs = s.ts } } }() diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 82037d6..63a969c 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -61,15 +61,14 @@ type MClock struct { idleTimeout float64 tagInfo map[string]TagInfo - mtx sync.Mutex - previous map[string]*request - inProgress uint64 - timeBasedScheduleTs float64 - reservationQueue *queue - limitQueue *queue - sharesQueue *queue - readyQueue *queue - closed bool + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool } // NewMClock creates new MClock scheduler instance with @@ -92,11 +91,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo idleTimeout: idleTimeout.Seconds(), tagInfo: tagInfo, - reservationQueue: &queue{}, - limitQueue: &queue{}, - sharesQueue: &queue{}, - readyQueue: &queue{}, - timeBasedScheduleTs: math.MaxFloat64, + reservationQueue: &queue{}, + limitQueue: &queue{}, + sharesQueue: &queue{}, + readyQueue: &queue{}, } previous := make(map[string]*request) @@ -139,15 +137,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e // No new requests for scheduling will be accepted after the closing. func (q *MClock) Close() { q.mtx.Lock() - defer q.mtx.Unlock() - q.closed = true - q.clock.close() for q.limitQueue.Len() > 0 { item := heap.Pop(q.limitQueue).(*limitMQueueItem) close(item.r.canceled) q.removeFromQueues(item.r) } + q.mtx.Unlock() + + q.clock.close() } func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { @@ -263,6 +261,10 @@ func (q *MClock) scheduleRequest() { q.mtx.Lock() defer q.mtx.Unlock() + if q.closed { + return + } + q.scheduleRequestUnsafe() } @@ -284,23 +286,23 @@ func (q *MClock) scheduleRequestUnsafe() { func (q *MClock) setNextScheduleTimer(now float64) { nextTs := math.MaxFloat64 + var hasNext bool if q.reservationQueue.Len() > 0 { nextTs = q.reservationQueue.items[0].ts() + hasNext = true } if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { nextTs = q.limitQueue.items[0].ts() + hasNext = true } if nextTs <= now { // should not happen as we always compare .ts() <= now return } - - if q.timeBasedScheduleTs > nextTs { - q.clock.runAt(nextTs, func() { - q.scheduleRequest() - }) - q.timeBasedScheduleTs = nextTs + if !hasNext { + return } + q.clock.runAt(nextTs, q.scheduleRequest) } func (q *MClock) scheduleByLimitAndWeight(now float64) { diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 3aa261f..f9da670 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) { close(checked) require.NoError(t, eg.Wait()) } + +func TestMClockLowLimit(t *testing.T) { + t.Parallel() + limit := 2.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(5) + eg.Go(func() error { + for range 3 { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + release() + } + return nil + }) + require.NoError(t, eg.Wait()) +} diff --git a/tagging/grpc.go b/tagging/grpc.go index 5e255dd..4e2fcfe 100644 --- a/tagging/grpc.go +++ b/tagging/grpc.go @@ -11,8 +11,8 @@ const ( ioTagHeader = "x-frostfs-io-tag" ) -// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. -func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor { +// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. +func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) }