limiting: Add Limiter #4
5 changed files with 361 additions and 0 deletions
75
limiting/limiter.go
Normal file
75
limiting/limiter.go
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
package limiting
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReleaseFunc func()
|
||||||
|
|
||||||
|
type Limiter interface {
|
||||||
|
// Acquire attempts to reserve a slot without blocking.
|
||||||
|
//
|
||||||
|
// Returns a release function and true if successful. The function must be
|
||||||
|
// called to release the limiter. The function must be called exactly once.
|
||||||
|
// Calling the function more that once will cause incorrect behavior of the
|
||||||
|
// limiter.
|
||||||
|
//
|
||||||
|
// Returns nil and false if fails.
|
||||||
|
//
|
||||||
|
// If the key was not defined in the limiter, no limit is applied.
|
||||||
|
Acquire(key string) (ReleaseFunc, bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SemaphoreLimiter struct {
|
||||||
|
m map[string]*semaphore.Semaphore
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyLimit defines a concurrency limit for a set of keys.
|
||||||
|
//
|
||||||
|
// All keys of one set share the same limit.
|
||||||
|
// Keys of different sets have separate limits.
|
||||||
|
//
|
||||||
|
// Sets must not overlap.
|
||||||
|
type KeyLimit struct {
|
||||||
|
Keys []string
|
||||||
|
Limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) {
|
||||||
|
lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)}
|
||||||
|
for _, limit := range limits {
|
||||||
|
if limit.Limit < 0 {
|
||||||
|
return nil, fmt.Errorf("invalid limit %d", limit.Limit)
|
||||||
|
}
|
||||||
|
sem := semaphore.NewSemaphore(limit.Limit)
|
||||||
|
|
||||||
|
if err := lr.addLimit(&limit, sem); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &lr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error {
|
||||||
|
for _, key := range limit.Keys {
|
||||||
|
if _, exists := lr.m[key]; exists {
|
||||||
|
return fmt.Errorf("duplicate key %q", key)
|
||||||
|
}
|
||||||
|
lr.m[key] = sem
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) {
|
||||||
|
sem, ok := lr.m[key]
|
||||||
|
if !ok {
|
||||||
|
return func() {}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := sem.Acquire(); ok {
|
||||||
|
return func() { sem.Release() }, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
138
limiting/limiter_test.go
Normal file
138
limiting/limiter_test.go
Normal file
|
@ -0,0 +1,138 @@
|
||||||
|
package limiting_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
operationDuration = 10 * time.Millisecond
|
||||||
|
operationCount = 64
|
||||||
|
)
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
keys []string
|
||||||
|
limit int64
|
||||||
|
withoutLimit bool
|
||||||
|
failCount atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLimiter(t *testing.T) {
|
||||||
|
testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) {
|
||||||
|
return limiting.NewSemaphoreLimiter(kl)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) {
|
||||||
|
t.Run("duplicate key", func(t *testing.T) {
|
||||||
|
_, err := getLimiter([]limiting.KeyLimit{
|
||||||
|
{[]string{"A", "B"}, 10},
|
||||||
|
{[]string{"B", "C"}, 10},
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
testCases := []*testCase{
|
||||||
|
{keys: []string{"A"}, limit: operationCount / 4},
|
||||||
|
{keys: []string{"B"}, limit: operationCount / 2},
|
||||||
|
{keys: []string{"C", "D"}, limit: operationCount / 4},
|
||||||
|
{keys: []string{"E"}, limit: 2 * operationCount},
|
||||||
|
{keys: []string{"F"}, withoutLimit: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
lr, err := getLimiter(getLimits(testCases))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tasks := createTestTasks(testCases, lr)
|
||||||
|
|
||||||
|
t.Run("first run", func(t *testing.T) {
|
||||||
|
executeTasks(tasks...)
|
||||||
|
verifyResults(t, testCases)
|
||||||
|
})
|
||||||
|
|
||||||
|
resetFailCounts(testCases)
|
||||||
|
|
||||||
|
t.Run("repeated run", func(t *testing.T) {
|
||||||
|
executeTasks(tasks...)
|
||||||
|
verifyResults(t, testCases)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLimits(testCases []*testCase) []limiting.KeyLimit {
|
||||||
|
var limits []limiting.KeyLimit
|
||||||
|
for _, tc := range testCases {
|
||||||
|
if tc.withoutLimit {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
limits = append(limits, limiting.KeyLimit{
|
||||||
|
Keys: tc.keys,
|
||||||
|
Limit: int64(tc.limit),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return limits
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() {
|
||||||
|
var tasks []func()
|
||||||
|
for _, tc := range testCases {
|
||||||
|
for _, key := range tc.keys {
|
||||||
|
tasks = append(tasks, func() {
|
||||||
|
executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) {
|
||||||
|
release, ok := lr.Acquire(key)
|
||||||
|
if !ok {
|
||||||
|
tc.failCount.Add(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
time.Sleep(operationDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func executeTasks(tasks ...func()) {
|
||||||
|
var g sync.WaitGroup
|
||||||
|
|
||||||
|
g.Add(len(tasks))
|
||||||
|
for _, task := range tasks {
|
||||||
|
go func() {
|
||||||
|
defer g.Done()
|
||||||
|
task()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func executeTaskN(N int, task func()) {
|
||||||
|
tasks := make([]func(), N)
|
||||||
|
for i := range N {
|
||||||
|
tasks[i] = task
|
||||||
|
}
|
||||||
|
executeTasks(tasks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyResults(t *testing.T, testCases []*testCase) {
|
||||||
|
for _, tc := range testCases {
|
||||||
|
var expectedFailCount int64
|
||||||
|
if !tc.withoutLimit {
|
||||||
|
numKeys := int64(len(tc.keys))
|
||||||
|
expectedFailCount = max(operationCount*numKeys-tc.limit, 0)
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedFailCount, tc.failCount.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetFailCounts(testCases []*testCase) {
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc.failCount.Store(0)
|
||||||
|
}
|
||||||
|
}
|
27
limiting/semaphore/semaphore.go
Normal file
27
limiting/semaphore/semaphore.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package semaphore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Semaphore struct {
|
||||||
|
count atomic.Int64
|
||||||
|
limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSemaphore(size int64) *Semaphore {
|
||||||
|
return &Semaphore{limit: size}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Semaphore) Acquire() bool {
|
||||||
|
v := s.count.Add(1)
|
||||||
|
if v > s.limit {
|
||||||
|
s.count.Add(-1)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Semaphore) Release() {
|
||||||
|
s.count.Add(-1)
|
||||||
|
}
|
26
limiting/semaphore/semaphore_bench.result
Normal file
26
limiting/semaphore/semaphore_bench.result
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
goos: linux
|
||||||
|
goarch: amd64
|
||||||
|
pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore
|
||||||
|
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=0s-12 6113605 1964 ns/op 383.1 acquire-ns/op 203.5 release-ns/op 0.6892 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=1µs-12 5826655 2067 ns/op 382.0 acquire-ns/op 307.0 release-ns/op 0.1460 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=10µs-12 5977272 2033 ns/op 370.4 acquire-ns/op 321.4 release-ns/op 0.05408 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=100µs-12 5862900 2030 ns/op 365.6 acquire-ns/op 343.1 release-ns/op 0.01242 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=0s-12 5637050 2173 ns/op 365.2 acquire-ns/op 261.7 release-ns/op 0.9765 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=1µs-12 5470316 2225 ns/op 390.4 acquire-ns/op 357.2 release-ns/op 0.9249 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=10µs-12 5584527 2134 ns/op 395.2 acquire-ns/op 339.0 release-ns/op 0.5409 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=100µs-12 5841032 2036 ns/op 369.4 acquire-ns/op 330.7 release-ns/op 0.1182 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=0s-12 5600013 2159 ns/op 369.9 acquire-ns/op 271.1 release-ns/op 0.9976 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=1µs-12 5323606 2280 ns/op 394.0 acquire-ns/op 368.9 release-ns/op 0.9697 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=10µs-12 5133394 2353 ns/op 405.8 acquire-ns/op 374.5 release-ns/op 0.9498 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=100µs-12 5238136 2303 ns/op 387.2 acquire-ns/op 362.2 release-ns/op 0.8749 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=0s-12 5408720 2180 ns/op 367.6 acquire-ns/op 271.5 release-ns/op 0.9992 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=1µs-12 5114854 2366 ns/op 407.9 acquire-ns/op 376.4 release-ns/op 0.9966 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=10µs-12 4659454 2438 ns/op 412.2 acquire-ns/op 385.9 release-ns/op 0.9800 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=100µs-12 4837894 2482 ns/op 401.7 acquire-ns/op 380.9 release-ns/op 0.9725 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=0s-12 5403058 2188 ns/op 367.5 acquire-ns/op 273.1 release-ns/op 1.000 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=1µs-12 5086929 2306 ns/op 390.6 acquire-ns/op 376.3 release-ns/op 1.000 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=10µs-12 5059968 2378 ns/op 410.2 acquire-ns/op 384.5 release-ns/op 1.000 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=100µs-12 4909206 2420 ns/op 408.4 acquire-ns/op 383.4 release-ns/op 1.000 success-rate
|
||||||
|
PASS
|
||||||
|
ok git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore 284.895s
|
95
limiting/semaphore/semaphore_bench_test.go
Normal file
95
limiting/semaphore/semaphore_bench_test.go
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
package semaphore_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
semaphores "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxWorkers = 10_000_000
|
||||||
|
|
||||||
|
type benchmarkSemaphoreMetrics struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
|
acquireDuration,
|
||||||
|
releaseDuration time.Duration
|
||||||
|
|
||||||
|
acquireCount,
|
||||||
|
releaseCount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *benchmarkSemaphoreMetrics) reportAcquire(duration time.Duration) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.acquireDuration += duration
|
||||||
|
c.acquireCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *benchmarkSemaphoreMetrics) reportRelease(duration time.Duration) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.releaseDuration += duration
|
||||||
|
c.releaseCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *benchmarkSemaphoreMetrics) getResults() (timePerAcquire, timePerRelease, successRate float64) {
|
||||||
|
timePerAcquire = float64(c.acquireDuration) / float64(c.acquireCount)
|
||||||
|
timePerRelease = float64(c.releaseDuration) / float64(c.releaseCount)
|
||||||
|
successRate = float64(c.releaseCount) / float64(c.acquireCount)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSemaphore(b *testing.B) {
|
||||||
|
sizes := []int64{1, 10, 100, 1000, 10000}
|
||||||
|
lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond}
|
||||||
|
|
||||||
|
for _, size := range sizes {
|
||||||
|
for _, lockDuration := range lockDurations {
|
||||||
|
name := fmt.Sprintf("semaphore_size=%d/lock_duration=%v", size, lockDuration)
|
||||||
|
b.Run(name, func(b *testing.B) {
|
||||||
|
benchmarkSemaphore(b, semaphores.NewSemaphore(size), lockDuration)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkSemaphore(b *testing.B, sem *semaphores.Semaphore, lockDuration time.Duration) {
|
||||||
|
var m benchmarkSemaphoreMetrics
|
||||||
|
var g errgroup.Group
|
||||||
|
g.SetLimit(maxWorkers)
|
||||||
|
|
||||||
|
for range b.N {
|
||||||
|
g.Go(func() error {
|
||||||
|
now := time.Now()
|
||||||
|
ok := sem.Acquire()
|
||||||
|
m.reportAcquire(time.Since(now))
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(lockDuration)
|
||||||
|
|
||||||
|
now = time.Now()
|
||||||
|
sem.Release()
|
||||||
|
m.reportRelease(time.Since(now))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(b, g.Wait())
|
||||||
|
|
||||||
|
require.Equal(b, uint64(b.N), m.acquireCount)
|
||||||
|
require.LessOrEqual(b, m.releaseCount, m.acquireCount)
|
||||||
|
|
||||||
|
timePerAcquire, timePerRelease, successRate := m.getResults()
|
||||||
|
|
||||||
|
b.ReportMetric(timePerAcquire, "acquire-ns/op")
|
||||||
|
b.ReportMetric(timePerRelease, "release-ns/op")
|
||||||
|
b.ReportMetric(successRate, "success-rate")
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue