[#xx] limiting: Add Limiter
Some checks failed
Tests and linters / Run gofumpt (pull_request) Failing after 45s
Vulncheck / Vulncheck (pull_request) Failing after 1m1s
DCO action / DCO (pull_request) Failing after 1m15s
Tests and linters / Tests (pull_request) Successful in 1m12s
Tests and linters / Tests with -race (pull_request) Successful in 1m16s
Tests and linters / Staticcheck (pull_request) Successful in 1m33s
Tests and linters / Lint (pull_request) Successful in 2m8s
Tests and linters / gopls check (pull_request) Successful in 2m14s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m27s
Some checks failed
Tests and linters / Run gofumpt (pull_request) Failing after 45s
Vulncheck / Vulncheck (pull_request) Failing after 1m1s
DCO action / DCO (pull_request) Failing after 1m15s
Tests and linters / Tests (pull_request) Successful in 1m12s
Tests and linters / Tests with -race (pull_request) Successful in 1m16s
Tests and linters / Staticcheck (pull_request) Successful in 1m33s
Tests and linters / Lint (pull_request) Successful in 2m8s
Tests and linters / gopls check (pull_request) Successful in 2m14s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m27s
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
cfbca7fa1d
commit
25fffe99cd
2 changed files with 205 additions and 0 deletions
85
limiting/limiter.go
Normal file
85
limiting/limiter.go
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
package limiting
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type semaphore struct {
|
||||||
|
ch chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSemaphore(size uint64) *semaphore {
|
||||||
|
return &semaphore{make(chan struct{}, size)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *semaphore) acquire(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case s.ch <- struct{}{}:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *semaphore) tryAcquire() bool {
|
||||||
|
select {
|
||||||
|
case s.ch <- struct{}{}:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *semaphore) release() {
|
||||||
|
<-s.ch
|
||||||
|
}
|
||||||
|
|
||||||
|
type Limiter struct {
|
||||||
|
m map[string]*semaphore
|
||||||
|
}
|
||||||
|
|
||||||
|
type KeyLimit struct {
|
||||||
|
Keys []string
|
||||||
|
Limit uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReleaseFunc func()
|
||||||
|
|
||||||
|
func New(limits []KeyLimit) *Limiter {
|
||||||
|
lr := Limiter{m: make(map[string]*semaphore)}
|
||||||
|
|
||||||
|
for _, l := range limits {
|
||||||
|
sem := newSemaphore(l.Limit)
|
||||||
|
for _, item := range l.Keys {
|
||||||
|
lr.m[item] = sem
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &lr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *Limiter) Acquire(ctx context.Context, key string) (ReleaseFunc, error) {
|
||||||
|
sem, ok := lr.m[key]
|
||||||
|
if !ok {
|
||||||
|
// unknown key has no limit
|
||||||
|
return func() {}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sem.acquire(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return func() { sem.release() }, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *Limiter) TryAcquire(key string) (ReleaseFunc, bool) {
|
||||||
|
sem, ok := lr.m[key]
|
||||||
|
if !ok {
|
||||||
|
// unknown key has no limit
|
||||||
|
return func() {}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := sem.tryAcquire(); ok {
|
||||||
|
return func() { sem.release() }, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
120
limiting/limiter_test.go
Normal file
120
limiting/limiter_test.go
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
package limiting_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const operationDuration = time.Second
|
||||||
|
const operationCount = 64
|
||||||
|
|
||||||
|
type testKeyLimit struct {
|
||||||
|
keys []string
|
||||||
|
limit int
|
||||||
|
withoutLimit bool
|
||||||
|
failCount atomic.Int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLimits(testLimits []*testKeyLimit) []limiting.KeyLimit {
|
||||||
|
var limits []limiting.KeyLimit
|
||||||
|
for _, tl := range testLimits {
|
||||||
|
if tl.withoutLimit {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
limits = append(limits, limiting.KeyLimit{
|
||||||
|
Keys: tl.keys,
|
||||||
|
Limit: uint64(tl.limit),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return limits
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestTasks(testLimits []*testKeyLimit, limiter *limiting.Limiter) []func() {
|
||||||
|
var tasks []func()
|
||||||
|
for _, tl := range testLimits {
|
||||||
|
for _, key := range tl.keys {
|
||||||
|
tasks = append(tasks, func() {
|
||||||
|
runParallelN(operationCount, func() {
|
||||||
|
release, ok := limiter.TryAcquire(key)
|
||||||
|
if !ok {
|
||||||
|
tl.failCount.Add(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
time.Sleep(operationDuration)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rand.Shuffle(len(tasks), func(i, j int) {
|
||||||
|
tasks[i], tasks[j] = tasks[j], tasks[i]
|
||||||
|
})
|
||||||
|
return tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
func runParallel(tasks ...func()) {
|
||||||
|
var g sync.WaitGroup
|
||||||
|
|
||||||
|
g.Add(len(tasks))
|
||||||
|
for _, task := range tasks {
|
||||||
|
go func() {
|
||||||
|
defer g.Done()
|
||||||
|
task()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func runParallelN(N int, tsk func()) {
|
||||||
|
tasks := make([]func(), N)
|
||||||
|
for i := range N {
|
||||||
|
tasks[i] = tsk
|
||||||
|
}
|
||||||
|
runParallel(tasks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyResults(t *testing.T, testLimits []*testKeyLimit) {
|
||||||
|
for _, tl := range testLimits {
|
||||||
|
if tl.withoutLimit {
|
||||||
|
require.Equal(t, 0, int(tl.failCount.Load()))
|
||||||
|
} else {
|
||||||
|
require.Equal(t, max(operationCount*len(tl.keys)-tl.limit, 0), int(tl.failCount.Load()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetFailCounts(testLimits []*testKeyLimit) {
|
||||||
|
for _, tl := range testLimits {
|
||||||
|
tl.failCount.Store(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLimiter(t *testing.T) {
|
||||||
|
testLimits := []*testKeyLimit{
|
||||||
|
{keys: []string{"A"}, limit: operationCount / 4},
|
||||||
|
{keys: []string{"B"}, limit: operationCount / 2},
|
||||||
|
{keys: []string{"C", "D"}, limit: operationCount / 4},
|
||||||
|
{keys: []string{"E"}, limit: 2 * operationCount},
|
||||||
|
{keys: []string{"F"}, withoutLimit: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
lr := limiting.New(getLimits(testLimits))
|
||||||
|
tasks := createTestTasks(testLimits, lr)
|
||||||
|
|
||||||
|
t.Run("run once", func(t *testing.T) {
|
||||||
|
runParallel(tasks...)
|
||||||
|
verifyResults(t, testLimits)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("run again", func(t *testing.T) {
|
||||||
|
resetFailCounts(testLimits)
|
||||||
|
runParallel(tasks...)
|
||||||
|
verifyResults(t, testLimits)
|
||||||
|
})
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue