Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
b5ed0b6eff
[#14] CODEOWNERS: Use core commiters and developers groups
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-31 11:04:22 +03:00
6c6e5bf4de
[#14] mclock: Allow to prohibit tag requests
It is now possible to restrict requests for a specific tag.
A separate field in `TagInfo` is used to avoid comparing float64 values with zero.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-31 11:04:08 +03:00
57d895c321
[#13] mclock: Schedule requests as soon as possible
Let's assume that for some tag `limit = 1000 RPS` defined and each
request takes 10 ms to complete. At some point in time 1000 requests
were accepted. Then first request will be scheduled at `now()`, second -
at `now() + 1 ms`, third - at `now() + 2 ms` etc. Total processing
duration of 1000 requests will be 1 second + 10 ms.

After this fix scheduler looks forward to schedule requests within limit.
So for situation above total processing duration of 1000 requests will be
10 ms in ideal world.

The same for reservation scheduling.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-24 16:36:47 +03:00
32079ad7c2
[#12] grpc: Fix method name
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-20 17:24:39 +03:00
1ca213ee7c
[#12] mclock: Fix deadlock caused by mclock.Close
Deadlock scenario:
- mclock closed by `Close` method, it locks mutex and calls `clock.close`
- clock starts `scheduleRequest` goroutine, it tries to lock mutex
- `clock.Close` waits for all goroutines

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-20 10:31:15 +03:00
346752477b
[#12] mclock: Fix timer-based scheduling
Let's assume that there are two requests in the queue with execution time t1 and t2.
The timer is set to t1. The timer is triggered, schedules the t1 request,
calculates the time for the next timer t2 to be triggered.
But it doesn't schedules timer to this time because of the
`q.timeBasedScheduleTs > nextTs` check.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-19 17:12:00 +03:00
5 changed files with 148 additions and 41 deletions

View file

@ -1,3 +1,3 @@
.* @fyrchik .* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
.forgejo/.* @potyarkin .forgejo/.* @potyarkin
Makefile @potyarkin Makefile @potyarkin

View file

@ -1,6 +1,7 @@
package scheduling package scheduling
import ( import (
"math"
"sync" "sync"
"time" "time"
) )
@ -36,10 +37,7 @@ func (c *systemClock) now() float64 {
} }
func (c *systemClock) runAt(ts float64, f func()) { func (c *systemClock) runAt(ts float64, f func()) {
select { c.schedule <- scheduleInfo{ts: ts, f: f}
case c.schedule <- scheduleInfo{ts: ts, f: f}:
default: // timer fired, scheduleRequest will call runAt again
}
} }
func (c *systemClock) close() { func (c *systemClock) close() {
@ -53,18 +51,30 @@ func (c *systemClock) start() {
defer c.wg.Done() defer c.wg.Done()
t := time.NewTimer(0) t := time.NewTimer(0)
<-t.C <-t.C
var f func() currentTs := math.MaxFloat64
var currentTask func()
for { for {
select { select {
case <-t.C: case <-t.C:
if f != nil { if currentTask != nil {
f() c.wg.Add(1)
f = nil f := currentTask
go func() {
defer c.wg.Done()
f()
}()
currentTask = nil
} }
currentTs = math.MaxFloat64
case s, ok := <-c.schedule: case s, ok := <-c.schedule:
if !ok { if !ok {
return return
} }
if s.ts >= currentTs {
// current timer will fire earlier
// so next scheduleRequest will push new schedule event
continue
}
var d time.Duration var d time.Duration
now := c.now() now := c.now()
if now < s.ts { if now < s.ts {
@ -77,7 +87,8 @@ func (c *systemClock) start() {
} }
} }
t.Reset(d) t.Reset(d)
f = s.f currentTask = s.f
currentTs = s.ts
} }
} }
}() }()

View file

@ -22,6 +22,7 @@ var (
ErrMClockSchedulerUnknownTag = errors.New("unknown tag") ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
ErrTagRequestsProhibited = errors.New("tag requests are prohibited")
) )
type request struct { type request struct {
@ -49,6 +50,7 @@ type TagInfo struct {
ReservedIOPS *float64 ReservedIOPS *float64
LimitIOPS *float64 LimitIOPS *float64
Share float64 Share float64
Prohibited bool
} }
// MClock is mClock scheduling algorithm implementation. // MClock is mClock scheduling algorithm implementation.
@ -61,15 +63,14 @@ type MClock struct {
idleTimeout float64 idleTimeout float64
tagInfo map[string]TagInfo tagInfo map[string]TagInfo
mtx sync.Mutex mtx sync.Mutex
previous map[string]*request previous map[string]*request
inProgress uint64 inProgress uint64
timeBasedScheduleTs float64 reservationQueue *queue
reservationQueue *queue limitQueue *queue
limitQueue *queue sharesQueue *queue
sharesQueue *queue readyQueue *queue
readyQueue *queue closed bool
closed bool
} }
// NewMClock creates new MClock scheduler instance with // NewMClock creates new MClock scheduler instance with
@ -92,11 +93,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
idleTimeout: idleTimeout.Seconds(), idleTimeout: idleTimeout.Seconds(),
tagInfo: tagInfo, tagInfo: tagInfo,
reservationQueue: &queue{}, reservationQueue: &queue{},
limitQueue: &queue{}, limitQueue: &queue{},
sharesQueue: &queue{}, sharesQueue: &queue{},
readyQueue: &queue{}, readyQueue: &queue{},
timeBasedScheduleTs: math.MaxFloat64,
} }
previous := make(map[string]*request) previous := make(map[string]*request)
@ -139,15 +139,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e
// No new requests for scheduling will be accepted after the closing. // No new requests for scheduling will be accepted after the closing.
func (q *MClock) Close() { func (q *MClock) Close() {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock()
q.closed = true q.closed = true
q.clock.close()
for q.limitQueue.Len() > 0 { for q.limitQueue.Len() > 0 {
item := heap.Pop(q.limitQueue).(*limitMQueueItem) item := heap.Pop(q.limitQueue).(*limitMQueueItem)
close(item.r.canceled) close(item.r.canceled)
q.removeFromQueues(item.r) q.removeFromQueues(item.r)
} }
q.mtx.Unlock()
q.clock.close()
} }
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
@ -198,6 +198,9 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
if !ok { if !ok {
return nil, nil, ErrMClockSchedulerUnknownTag return nil, nil, ErrMClockSchedulerUnknownTag
} }
if tagInfo.Prohibited {
return nil, nil, ErrTagRequestsProhibited
}
prev, ok := q.previous[tag] prev, ok := q.previous[tag]
assert.Cond(ok, "undefined previous:", tag) assert.Cond(ok, "undefined previous:", tag)
@ -263,6 +266,10 @@ func (q *MClock) scheduleRequest() {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock() defer q.mtx.Unlock()
if q.closed {
return
}
q.scheduleRequestUnsafe() q.scheduleRequestUnsafe()
} }
@ -284,27 +291,27 @@ func (q *MClock) scheduleRequestUnsafe() {
func (q *MClock) setNextScheduleTimer(now float64) { func (q *MClock) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64 nextTs := math.MaxFloat64
var hasNext bool
if q.reservationQueue.Len() > 0 { if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts() nextTs = q.reservationQueue.items[0].ts()
hasNext = true
} }
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts() nextTs = q.limitQueue.items[0].ts()
hasNext = true
} }
if nextTs <= now { if nextTs <= now {
// should not happen as we always compare .ts() <= now // should not happen as we always compare .ts() <= now
return return
} }
if !hasNext {
if q.timeBasedScheduleTs > nextTs { return
q.clock.runAt(nextTs, func() {
q.scheduleRequest()
})
q.timeBasedScheduleTs = nextTs
} }
q.clock.runAt(nextTs, q.scheduleRequest)
} }
func (q *MClock) scheduleByLimitAndWeight(now float64) { func (q *MClock) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem) ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
} }
@ -347,7 +354,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
} }
func (q *MClock) scheduleByReservation(now float64) { func (q *MClock) scheduleByReservation(now float64) {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r) q.removeFromQueues(next.r)

View file

@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) {
reqCount = (reqCount / 2) * 2 reqCount = (reqCount / 2) * 2
limit := 0.01 // 1 request in 100 seconds limit := 0.01 // 1 request in 100 seconds
resevation := 100.0 // 100 RPS resevation := 100.0 // 100 RPS
cl := &noopClock{} cl := &noopClock{v: float64(1.0)}
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit}, "class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
@ -237,15 +237,18 @@ func TestMClockReservationScheduling(t *testing.T) {
q.scheduleRequest() q.scheduleRequest()
count := 0
for _, req := range requests { for _, req := range requests {
select { select {
case <-req.scheduled: case <-req.scheduled:
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") require.Equal(t, req.tag, "class2")
count++
default: default:
} }
} }
require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled")
cl.v = 1.00001 // 1s elapsed cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy
q.scheduleRequest() q.scheduleRequest()
var result []string var result []string
@ -258,7 +261,7 @@ func TestMClockReservationScheduling(t *testing.T) {
} }
} }
require.Equal(t, 100, len(result)) require.Equal(t, 200, len(result))
for _, res := range result { for _, res := range result {
require.Equal(t, "class2", res) require.Equal(t, "class2", res)
} }
@ -493,3 +496,89 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
close(checked) close(checked)
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestMClockLowLimit(t *testing.T) {
t.Parallel()
limit := 2.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(5)
eg.Go(func() error {
for range 3 {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
release()
}
return nil
})
require.NoError(t, eg.Wait())
}
func TestMClockLimitTotalTime(t *testing.T) {
t.Parallel()
limit := 10.0 // 10 RPS -> 1 request per 100 ms
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
// 10 requests, each request runs for 500 ms,
// but they should be scheduled as soon as possible,
// so total duration must be less than 1 second
eg, ctx := errgroup.WithContext(context.Background())
startedAt := time.Now()
for range 10 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) <= 1*time.Second)
// 11 requests, limit = 10 RPS, so 10 requests should be
// scheduled as soon as possible, but last request should be
// scheduled at now + 1.0 s
eg, ctx = errgroup.WithContext(context.Background())
startedAt = time.Now()
for range 11 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) >= 1500*time.Millisecond)
require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests
}
func TestMClockRestictTagRequests(t *testing.T) {
t.Parallel()
limit := 10.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
"class2": {Share: 50, LimitIOPS: &limit, Prohibited: true},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
release, err := q.RequestArrival(context.Background(), "class1")
require.NoError(t, err)
release()
release, err = q.RequestArrival(context.Background(), "class2")
require.ErrorIs(t, err, ErrTagRequestsProhibited)
require.Nil(t, release)
}

View file

@ -11,8 +11,8 @@ const (
ioTagHeader = "x-frostfs-io-tag" ioTagHeader = "x-frostfs-io-tag"
) )
// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. // NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor { func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
} }