[#4] limiting: Add limiter

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2025-02-04 17:32:01 +03:00
parent 30e83428fd
commit 59fb93fb23
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
3 changed files with 240 additions and 0 deletions

75
limiting/limiter.go Normal file
View file

@ -0,0 +1,75 @@
package limiting
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
)
type ReleaseFunc func()
type Limiter interface {
// Acquire attempts to reserve a slot without blocking.
//
// Returns a release function and true if successful. The function must be
// called to release the limiter. The function must be called exactly once.
// Calling the function more that once will cause incorrect behavior of the
// limiter.
//
// Returns nil and false if fails.
//
// If the key was not defined in the limiter, no limit is applied.
Acquire(key string) (ReleaseFunc, bool)
}
type SemaphoreLimiter struct {
m map[string]*semaphore.Semaphore
}
// KeyLimit defines a concurrency limit for a set of keys.
//
// All keys of one set share the same limit.
// Keys of different sets have separate limits.
//
// Sets must not overlap.
type KeyLimit struct {
Keys []string
Limit int64
}
func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) {
lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)}
for _, limit := range limits {
if limit.Limit < 0 {
return nil, fmt.Errorf("invalid limit %d", limit.Limit)
}
sem := semaphore.NewSemaphore(limit.Limit)
if err := lr.addLimit(&limit, sem); err != nil {
return nil, err
}
}
return &lr, nil
}
func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error {
for _, key := range limit.Keys {
if _, exists := lr.m[key]; exists {
return fmt.Errorf("duplicate key %q", key)
}
lr.m[key] = sem
}
return nil
}
func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) {
sem, ok := lr.m[key]
if !ok {
return func() {}, true
}
if ok := sem.Acquire(); ok {
return func() { sem.Release() }, true
}
return nil, false
}

138
limiting/limiter_test.go Normal file
View file

@ -0,0 +1,138 @@
package limiting_test
import (
"sync"
"sync/atomic"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
"github.com/stretchr/testify/require"
)
const (
operationDuration = 10 * time.Millisecond
operationCount = 64
)
type testCase struct {
keys []string
limit int64
withoutLimit bool
failCount atomic.Int64
}
func TestLimiter(t *testing.T) {
testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) {
return limiting.NewSemaphoreLimiter(kl)
})
}
func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) {
t.Run("duplicate key", func(t *testing.T) {
_, err := getLimiter([]limiting.KeyLimit{
{[]string{"A", "B"}, 10},
{[]string{"B", "C"}, 10},
})
require.Error(t, err)
})
testCases := []*testCase{
{keys: []string{"A"}, limit: operationCount / 4},
{keys: []string{"B"}, limit: operationCount / 2},
{keys: []string{"C", "D"}, limit: operationCount / 4},
{keys: []string{"E"}, limit: 2 * operationCount},
{keys: []string{"F"}, withoutLimit: true},
}
lr, err := getLimiter(getLimits(testCases))
require.NoError(t, err)
tasks := createTestTasks(testCases, lr)
t.Run("first run", func(t *testing.T) {
executeTasks(tasks...)
verifyResults(t, testCases)
})
resetFailCounts(testCases)
t.Run("repeated run", func(t *testing.T) {
executeTasks(tasks...)
verifyResults(t, testCases)
})
}
func getLimits(testCases []*testCase) []limiting.KeyLimit {
var limits []limiting.KeyLimit
for _, tc := range testCases {
if tc.withoutLimit {
continue
}
limits = append(limits, limiting.KeyLimit{
Keys: tc.keys,
Limit: int64(tc.limit),
})
}
return limits
}
func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() {
var tasks []func()
for _, tc := range testCases {
for _, key := range tc.keys {
tasks = append(tasks, func() {
executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) })
})
}
}
return tasks
}
func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) {
release, ok := lr.Acquire(key)
if !ok {
tc.failCount.Add(1)
return
}
defer release()
time.Sleep(operationDuration)
}
func executeTasks(tasks ...func()) {
var g sync.WaitGroup
g.Add(len(tasks))
for _, task := range tasks {
go func() {
defer g.Done()
task()
}()
}
g.Wait()
}
func executeTaskN(N int, task func()) {
tasks := make([]func(), N)
for i := range N {
tasks[i] = task
}
executeTasks(tasks...)
}
func verifyResults(t *testing.T, testCases []*testCase) {
for _, tc := range testCases {
var expectedFailCount int64
if !tc.withoutLimit {
numKeys := int64(len(tc.keys))
expectedFailCount = max(operationCount*numKeys-tc.limit, 0)
}
require.Equal(t, expectedFailCount, tc.failCount.Load())
}
}
func resetFailCounts(testCases []*testCase) {
for _, tc := range testCases {
tc.failCount.Store(0)
}
}

View file

@ -0,0 +1,27 @@
package semaphore
import (
"sync/atomic"
)
type Semaphore struct {
count atomic.Int64
limit int64
}
func NewSemaphore(size int64) *Semaphore {
return &Semaphore{limit: size}
}
func (s *Semaphore) Acquire() bool {
v := s.count.Add(1)
if v > s.limit {
s.count.Add(-1)
return false
}
return true
}
func (s *Semaphore) Release() {
s.count.Add(-1)
}