From 346752477b8419df9811415e3bc935bc83b5d053 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 19 Mar 2025 15:51:38 +0300 Subject: [PATCH 1/3] [#12] mclock: Fix timer-based scheduling Let's assume that there are two requests in the queue with execution time t1 and t2. The timer is set to t1. The timer is triggered, schedules the t1 request, calculates the time for the next timer t2 to be triggered. But it doesn't schedules timer to this time because of the `q.timeBasedScheduleTs > nextTs` check. Signed-off-by: Dmitrii Stepanov --- scheduling/clock.go | 29 ++++++++++++++++++++--------- scheduling/mclock.go | 38 ++++++++++++++++++-------------------- scheduling/mclock_test.go | 22 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/scheduling/clock.go b/scheduling/clock.go index 9fe66bd..6fa3d84 100644 --- a/scheduling/clock.go +++ b/scheduling/clock.go @@ -1,6 +1,7 @@ package scheduling import ( + "math" "sync" "time" ) @@ -36,10 +37,7 @@ func (c *systemClock) now() float64 { } func (c *systemClock) runAt(ts float64, f func()) { - select { - case c.schedule <- scheduleInfo{ts: ts, f: f}: - default: // timer fired, scheduleRequest will call runAt again - } + c.schedule <- scheduleInfo{ts: ts, f: f} } func (c *systemClock) close() { @@ -53,18 +51,30 @@ func (c *systemClock) start() { defer c.wg.Done() t := time.NewTimer(0) <-t.C - var f func() + currentTs := math.MaxFloat64 + var currentTask func() for { select { case <-t.C: - if f != nil { - f() - f = nil + if currentTask != nil { + c.wg.Add(1) + f := currentTask + go func() { + defer c.wg.Done() + f() + }() + currentTask = nil } + currentTs = math.MaxFloat64 case s, ok := <-c.schedule: if !ok { return } + if s.ts >= currentTs { + // current timer will fire earlier + // so next scheduleRequest will push new schedule event + continue + } var d time.Duration now := c.now() if now < s.ts { @@ -77,7 +87,8 @@ func (c *systemClock) start() { } } t.Reset(d) - f = s.f + currentTask = s.f + currentTs = s.ts } } }() diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 82037d6..6d13d5d 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -61,15 +61,14 @@ type MClock struct { idleTimeout float64 tagInfo map[string]TagInfo - mtx sync.Mutex - previous map[string]*request - inProgress uint64 - timeBasedScheduleTs float64 - reservationQueue *queue - limitQueue *queue - sharesQueue *queue - readyQueue *queue - closed bool + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool } // NewMClock creates new MClock scheduler instance with @@ -92,11 +91,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo idleTimeout: idleTimeout.Seconds(), tagInfo: tagInfo, - reservationQueue: &queue{}, - limitQueue: &queue{}, - sharesQueue: &queue{}, - readyQueue: &queue{}, - timeBasedScheduleTs: math.MaxFloat64, + reservationQueue: &queue{}, + limitQueue: &queue{}, + sharesQueue: &queue{}, + readyQueue: &queue{}, } previous := make(map[string]*request) @@ -284,23 +282,23 @@ func (q *MClock) scheduleRequestUnsafe() { func (q *MClock) setNextScheduleTimer(now float64) { nextTs := math.MaxFloat64 + var hasNext bool if q.reservationQueue.Len() > 0 { nextTs = q.reservationQueue.items[0].ts() + hasNext = true } if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { nextTs = q.limitQueue.items[0].ts() + hasNext = true } if nextTs <= now { // should not happen as we always compare .ts() <= now return } - - if q.timeBasedScheduleTs > nextTs { - q.clock.runAt(nextTs, func() { - q.scheduleRequest() - }) - q.timeBasedScheduleTs = nextTs + if !hasNext { + return } + q.clock.runAt(nextTs, q.scheduleRequest) } func (q *MClock) scheduleByLimitAndWeight(now float64) { diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 3aa261f..f9da670 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) { close(checked) require.NoError(t, eg.Wait()) } + +func TestMClockLowLimit(t *testing.T) { + t.Parallel() + limit := 2.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(5) + eg.Go(func() error { + for range 3 { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + release() + } + return nil + }) + require.NoError(t, eg.Wait()) +} -- 2.45.3 From 1ca213ee7cb6504561a314220312d4f0c2abc446 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 20 Mar 2025 10:31:15 +0300 Subject: [PATCH 2/3] [#12] mclock: Fix deadlock caused by mclock.Close Deadlock scenario: - mclock closed by `Close` method, it locks mutex and calls `clock.close` - clock starts `scheduleRequest` goroutine, it tries to lock mutex - `clock.Close` waits for all goroutines Signed-off-by: Dmitrii Stepanov --- scheduling/mclock.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 6d13d5d..63a969c 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -137,15 +137,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e // No new requests for scheduling will be accepted after the closing. func (q *MClock) Close() { q.mtx.Lock() - defer q.mtx.Unlock() - q.closed = true - q.clock.close() for q.limitQueue.Len() > 0 { item := heap.Pop(q.limitQueue).(*limitMQueueItem) close(item.r.canceled) q.removeFromQueues(item.r) } + q.mtx.Unlock() + + q.clock.close() } func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { @@ -261,6 +261,10 @@ func (q *MClock) scheduleRequest() { q.mtx.Lock() defer q.mtx.Unlock() + if q.closed { + return + } + q.scheduleRequestUnsafe() } -- 2.45.3 From 32079ad7c2752b2ee1ed89c927e11418b641655a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 20 Mar 2025 17:24:39 +0300 Subject: [PATCH 3/3] [#12] grpc: Fix method name Signed-off-by: Dmitrii Stepanov --- tagging/grpc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tagging/grpc.go b/tagging/grpc.go index 5e255dd..4e2fcfe 100644 --- a/tagging/grpc.go +++ b/tagging/grpc.go @@ -11,8 +11,8 @@ const ( ioTagHeader = "x-frostfs-io-tag" ) -// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. -func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor { +// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. +func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) } -- 2.45.3