[#4] limiting: Add limiter
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
30e83428fd
commit
3a15523b78
3 changed files with 246 additions and 0 deletions
81
limiting/limiter.go
Normal file
81
limiting/limiter.go
Normal file
|
@ -0,0 +1,81 @@
|
|||
package limiting
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
semaphores "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||
)
|
||||
|
||||
type ReleaseFunc func()
|
||||
|
||||
type Limiter interface {
|
||||
// Acquire attempts to reserve a slot without blocking.
|
||||
//
|
||||
// Returns a release function and true if successful, otherwise false.
|
||||
// The release function must be called exactly once.
|
||||
//
|
||||
// If the key was not defined in the limiter, no limit is applied.
|
||||
Acquire(key string) (ReleaseFunc, bool)
|
||||
}
|
||||
|
||||
type semaphore interface {
|
||||
Acquire() bool
|
||||
Release()
|
||||
}
|
||||
|
||||
type semaphoreLimiter[T semaphore] struct {
|
||||
m map[string]T
|
||||
}
|
||||
|
||||
// KeyLimit defines a concurrency limit for a set of keys.
|
||||
//
|
||||
// All keys of one set share the same limit.
|
||||
// Keys of different sets have separate limits.
|
||||
//
|
||||
// Sets must not overlap.
|
||||
type KeyLimit struct {
|
||||
Keys []string
|
||||
Limit int64
|
||||
}
|
||||
|
||||
var NewBurstAtomicSemaphoreLimiter = func(limits []KeyLimit) (*semaphoreLimiter[*semaphores.BurstAtomicSemaphore], error) {
|
||||
return newSemaphoreLimiter(limits, semaphores.NewBurstAtomicSemaphore)
|
||||
}
|
||||
|
||||
func newSemaphoreLimiter[T semaphore](limits []KeyLimit, newSemaphore func(size int64) T) (*semaphoreLimiter[T], error) {
|
||||
lr := semaphoreLimiter[T]{make(map[string]T)}
|
||||
for _, limit := range limits {
|
||||
if err := lr.addLimit(&limit, newSemaphore); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &lr, nil
|
||||
}
|
||||
|
||||
func (lr *semaphoreLimiter[T]) addLimit(limit *KeyLimit, newSemaphore func(size int64) T) error {
|
||||
if limit.Limit < 0 {
|
||||
return fmt.Errorf("invalid limit %d", limit.Limit)
|
||||
}
|
||||
|
||||
sem := newSemaphore(limit.Limit)
|
||||
for _, key := range limit.Keys {
|
||||
if _, exists := lr.m[key]; exists {
|
||||
return fmt.Errorf("duplicate key %q", key)
|
||||
}
|
||||
lr.m[key] = sem
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lr *semaphoreLimiter[T]) Acquire(key string) (ReleaseFunc, bool) {
|
||||
sem, ok := lr.m[key]
|
||||
if !ok {
|
||||
return func() {}, true
|
||||
}
|
||||
|
||||
if ok := sem.Acquire(); ok {
|
||||
return func() { sem.Release() }, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
138
limiting/limiter_test.go
Normal file
138
limiting/limiter_test.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
package limiting_test
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
operationDuration = 10 * time.Millisecond
|
||||
operationCount = 64
|
||||
)
|
||||
|
||||
type testCase struct {
|
||||
keys []string
|
||||
limit int64
|
||||
withoutLimit bool
|
||||
failCount atomic.Int64
|
||||
}
|
||||
|
||||
func TestLimiter(t *testing.T) {
|
||||
testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) {
|
||||
return limiting.NewBurstAtomicSemaphoreLimiter(kl)
|
||||
})
|
||||
}
|
||||
|
||||
func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) {
|
||||
t.Run("duplicate key", func(t *testing.T) {
|
||||
_, err := getLimiter([]limiting.KeyLimit{
|
||||
{[]string{"A", "B"}, 10},
|
||||
{[]string{"B", "C"}, 10},
|
||||
})
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
testCases := []*testCase{
|
||||
{keys: []string{"A"}, limit: operationCount / 4},
|
||||
{keys: []string{"B"}, limit: operationCount / 2},
|
||||
{keys: []string{"C", "D"}, limit: operationCount / 4},
|
||||
{keys: []string{"E"}, limit: 2 * operationCount},
|
||||
{keys: []string{"F"}, withoutLimit: true},
|
||||
}
|
||||
|
||||
lr, err := getLimiter(getLimits(testCases))
|
||||
require.NoError(t, err)
|
||||
|
||||
tasks := createTestTasks(testCases, lr)
|
||||
|
||||
t.Run("first run", func(t *testing.T) {
|
||||
executeTasks(tasks...)
|
||||
verifyResults(t, testCases)
|
||||
})
|
||||
|
||||
resetFailCounts(testCases)
|
||||
|
||||
t.Run("repeated run", func(t *testing.T) {
|
||||
executeTasks(tasks...)
|
||||
verifyResults(t, testCases)
|
||||
})
|
||||
}
|
||||
|
||||
func getLimits(testCases []*testCase) []limiting.KeyLimit {
|
||||
var limits []limiting.KeyLimit
|
||||
for _, tc := range testCases {
|
||||
if tc.withoutLimit {
|
||||
continue
|
||||
}
|
||||
limits = append(limits, limiting.KeyLimit{
|
||||
Keys: tc.keys,
|
||||
Limit: int64(tc.limit),
|
||||
})
|
||||
}
|
||||
return limits
|
||||
}
|
||||
|
||||
func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() {
|
||||
var tasks []func()
|
||||
for _, tc := range testCases {
|
||||
for _, key := range tc.keys {
|
||||
tasks = append(tasks, func() {
|
||||
executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) })
|
||||
})
|
||||
}
|
||||
}
|
||||
return tasks
|
||||
}
|
||||
|
||||
func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) {
|
||||
release, ok := lr.Acquire(key)
|
||||
if !ok {
|
||||
tc.failCount.Add(1)
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
time.Sleep(operationDuration)
|
||||
}
|
||||
|
||||
func executeTasks(tasks ...func()) {
|
||||
var g sync.WaitGroup
|
||||
|
||||
g.Add(len(tasks))
|
||||
for _, task := range tasks {
|
||||
go func() {
|
||||
defer g.Done()
|
||||
task()
|
||||
}()
|
||||
}
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
func executeTaskN(N int, task func()) {
|
||||
tasks := make([]func(), N)
|
||||
for i := range N {
|
||||
tasks[i] = task
|
||||
}
|
||||
executeTasks(tasks...)
|
||||
}
|
||||
|
||||
func verifyResults(t *testing.T, testCases []*testCase) {
|
||||
for _, tc := range testCases {
|
||||
var expectedFailCount int64
|
||||
if !tc.withoutLimit {
|
||||
numKeys := int64(len(tc.keys))
|
||||
expectedFailCount = max(operationCount*numKeys-tc.limit, 0)
|
||||
}
|
||||
require.Equal(t, expectedFailCount, tc.failCount.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func resetFailCounts(testCases []*testCase) {
|
||||
for _, tc := range testCases {
|
||||
tc.failCount.Store(0)
|
||||
}
|
||||
}
|
27
limiting/semaphore/semaphore.go
Normal file
27
limiting/semaphore/semaphore.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package semaphore
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type BurstAtomicSemaphore struct {
|
||||
count atomic.Int64
|
||||
limit int64
|
||||
}
|
||||
|
||||
func NewBurstAtomicSemaphore(size int64) *BurstAtomicSemaphore {
|
||||
return &BurstAtomicSemaphore{limit: size}
|
||||
}
|
||||
|
||||
func (s *BurstAtomicSemaphore) Acquire() bool {
|
||||
v := s.count.Add(1)
|
||||
if v > s.limit {
|
||||
s.count.Add(-1)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *BurstAtomicSemaphore) Release() {
|
||||
s.count.Add(-1)
|
||||
}
|
Loading…
Add table
Reference in a new issue