limiting: Add Limiter #4
5 changed files with 361 additions and 0 deletions
75
limiting/limiter.go
Normal file
75
limiting/limiter.go
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
package limiting
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||||
|
)
|
||||||
|
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
|
|||||||
|
type ReleaseFunc func()
|
||||||
|
|
||||||
|
type Limiter interface {
|
||||||
|
// Acquire attempts to reserve a slot without blocking.
|
||||||
|
//
|
||||||
|
// Returns a release function and true if successful. The function must be
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Shoul I call release func if Shoul I call release func if `false` returned? Please, extend the doc.
a-savchuk
commented
Done Done
|
|||||||
|
// called to release the limiter. The function must be called exactly once.
|
||||||
|
// Calling the function more that once will cause incorrect behavior of the
|
||||||
|
// limiter.
|
||||||
|
//
|
||||||
|
// Returns nil and false if fails.
|
||||||
|
//
|
||||||
|
// If the key was not defined in the limiter, no limit is applied.
|
||||||
|
Acquire(key string) (ReleaseFunc, bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SemaphoreLimiter struct {
|
||||||
|
m map[string]*semaphore.Semaphore
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
To much abstractions: To much abstractions: `semaphore`, generic type, `BurstAtomic`, etc.
Please, make the code simple.
a-savchuk
commented
Done Done
|
|||||||
|
}
|
||||||
|
|
||||||
|
// KeyLimit defines a concurrency limit for a set of keys.
|
||||||
|
//
|
||||||
|
// All keys of one set share the same limit.
|
||||||
|
// Keys of different sets have separate limits.
|
||||||
|
//
|
||||||
|
// Sets must not overlap.
|
||||||
|
type KeyLimit struct {
|
||||||
|
Keys []string
|
||||||
|
Limit int64
|
||||||
|
}
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
It's enough to acquire semaphore by two concurrent goroutines and we're on It's enough to acquire semaphore by two concurrent goroutines and we're on `panic` :)
a-savchuk
commented
Why? This map is used only for reading Why? This map is used only for reading
aarifullin
commented
Yeah, okay. Then nevermind ```go
sem, ok := lr.m[key]
if !ok {
return func() {}, nil
}
```
Yeah, okay. Then nevermind
a-savchuk
commented
I think it'd be good if I leave a comment about it I think it'd be good if I leave a comment about it
|
|||||||
|
|
||||||
|
func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) {
|
||||||
|
lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)}
|
||||||
|
for _, limit := range limits {
|
||||||
|
if limit.Limit < 0 {
|
||||||
|
return nil, fmt.Errorf("invalid limit %d", limit.Limit)
|
||||||
|
}
|
||||||
|
sem := semaphore.NewSemaphore(limit.Limit)
|
||||||
|
|
||||||
|
if err := lr.addLimit(&limit, sem); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &lr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error {
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Some kind of validation is needed. What happens if I specify two identical keys? Some kind of validation is needed. What happens if I specify two identical keys?
a-savchuk
commented
Done Done
|
|||||||
|
for _, key := range limit.Keys {
|
||||||
|
if _, exists := lr.m[key]; exists {
|
||||||
|
return fmt.Errorf("duplicate key %q", key)
|
||||||
|
}
|
||||||
|
lr.m[key] = sem
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) {
|
||||||
|
sem, ok := lr.m[key]
|
||||||
|
if !ok {
|
||||||
|
return func() {}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := sem.Acquire(); ok {
|
||||||
|
return func() { sem.Release() }, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
138
limiting/limiter_test.go
Normal file
138
limiting/limiter_test.go
Normal file
|
@ -0,0 +1,138 @@
|
||||||
|
package limiting_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
operationDuration = 10 * time.Millisecond
|
||||||
|
operationCount = 64
|
||||||
|
)
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
keys []string
|
||||||
|
limit int64
|
||||||
|
withoutLimit bool
|
||||||
|
failCount atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLimiter(t *testing.T) {
|
||||||
|
testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) {
|
||||||
|
return limiting.NewSemaphoreLimiter(kl)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) {
|
||||||
|
t.Run("duplicate key", func(t *testing.T) {
|
||||||
|
_, err := getLimiter([]limiting.KeyLimit{
|
||||||
|
{[]string{"A", "B"}, 10},
|
||||||
|
{[]string{"B", "C"}, 10},
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
testCases := []*testCase{
|
||||||
|
{keys: []string{"A"}, limit: operationCount / 4},
|
||||||
|
{keys: []string{"B"}, limit: operationCount / 2},
|
||||||
|
{keys: []string{"C", "D"}, limit: operationCount / 4},
|
||||||
|
{keys: []string{"E"}, limit: 2 * operationCount},
|
||||||
|
{keys: []string{"F"}, withoutLimit: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
lr, err := getLimiter(getLimits(testCases))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tasks := createTestTasks(testCases, lr)
|
||||||
|
|
||||||
|
t.Run("first run", func(t *testing.T) {
|
||||||
|
executeTasks(tasks...)
|
||||||
|
verifyResults(t, testCases)
|
||||||
|
})
|
||||||
|
|
||||||
|
resetFailCounts(testCases)
|
||||||
|
|
||||||
|
t.Run("repeated run", func(t *testing.T) {
|
||||||
|
executeTasks(tasks...)
|
||||||
|
verifyResults(t, testCases)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLimits(testCases []*testCase) []limiting.KeyLimit {
|
||||||
|
var limits []limiting.KeyLimit
|
||||||
|
for _, tc := range testCases {
|
||||||
|
if tc.withoutLimit {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
limits = append(limits, limiting.KeyLimit{
|
||||||
|
Keys: tc.keys,
|
||||||
|
Limit: int64(tc.limit),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return limits
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() {
|
||||||
|
var tasks []func()
|
||||||
|
for _, tc := range testCases {
|
||||||
|
for _, key := range tc.keys {
|
||||||
|
tasks = append(tasks, func() {
|
||||||
|
executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) {
|
||||||
|
release, ok := lr.Acquire(key)
|
||||||
|
if !ok {
|
||||||
|
tc.failCount.Add(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
time.Sleep(operationDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func executeTasks(tasks ...func()) {
|
||||||
|
var g sync.WaitGroup
|
||||||
|
|
||||||
|
g.Add(len(tasks))
|
||||||
|
for _, task := range tasks {
|
||||||
|
go func() {
|
||||||
|
defer g.Done()
|
||||||
|
task()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func executeTaskN(N int, task func()) {
|
||||||
|
tasks := make([]func(), N)
|
||||||
|
for i := range N {
|
||||||
|
tasks[i] = task
|
||||||
|
}
|
||||||
|
executeTasks(tasks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyResults(t *testing.T, testCases []*testCase) {
|
||||||
|
for _, tc := range testCases {
|
||||||
|
var expectedFailCount int64
|
||||||
|
if !tc.withoutLimit {
|
||||||
|
numKeys := int64(len(tc.keys))
|
||||||
|
expectedFailCount = max(operationCount*numKeys-tc.limit, 0)
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedFailCount, tc.failCount.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetFailCounts(testCases []*testCase) {
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc.failCount.Store(0)
|
||||||
|
}
|
||||||
|
}
|
27
limiting/semaphore/semaphore.go
Normal file
27
limiting/semaphore/semaphore.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package semaphore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Semaphore struct {
|
||||||
|
count atomic.Int64
|
||||||
|
limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSemaphore(size int64) *Semaphore {
|
||||||
|
return &Semaphore{limit: size}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Semaphore) Acquire() bool {
|
||||||
|
v := s.count.Add(1)
|
||||||
|
if v > s.limit {
|
||||||
|
s.count.Add(-1)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Semaphore) Release() {
|
||||||
|
s.count.Add(-1)
|
||||||
|
}
|
26
limiting/semaphore/semaphore_bench.result
Normal file
26
limiting/semaphore/semaphore_bench.result
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
goos: linux
|
||||||
|
goarch: amd64
|
||||||
|
pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore
|
||||||
|
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=0s-12 6113605 1964 ns/op 383.1 acquire-ns/op 203.5 release-ns/op 0.6892 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=1µs-12 5826655 2067 ns/op 382.0 acquire-ns/op 307.0 release-ns/op 0.1460 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=10µs-12 5977272 2033 ns/op 370.4 acquire-ns/op 321.4 release-ns/op 0.05408 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1/lock_duration=100µs-12 5862900 2030 ns/op 365.6 acquire-ns/op 343.1 release-ns/op 0.01242 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=0s-12 5637050 2173 ns/op 365.2 acquire-ns/op 261.7 release-ns/op 0.9765 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=1µs-12 5470316 2225 ns/op 390.4 acquire-ns/op 357.2 release-ns/op 0.9249 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=10µs-12 5584527 2134 ns/op 395.2 acquire-ns/op 339.0 release-ns/op 0.5409 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10/lock_duration=100µs-12 5841032 2036 ns/op 369.4 acquire-ns/op 330.7 release-ns/op 0.1182 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=0s-12 5600013 2159 ns/op 369.9 acquire-ns/op 271.1 release-ns/op 0.9976 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=1µs-12 5323606 2280 ns/op 394.0 acquire-ns/op 368.9 release-ns/op 0.9697 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=10µs-12 5133394 2353 ns/op 405.8 acquire-ns/op 374.5 release-ns/op 0.9498 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=100/lock_duration=100µs-12 5238136 2303 ns/op 387.2 acquire-ns/op 362.2 release-ns/op 0.8749 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=0s-12 5408720 2180 ns/op 367.6 acquire-ns/op 271.5 release-ns/op 0.9992 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=1µs-12 5114854 2366 ns/op 407.9 acquire-ns/op 376.4 release-ns/op 0.9966 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=10µs-12 4659454 2438 ns/op 412.2 acquire-ns/op 385.9 release-ns/op 0.9800 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=1000/lock_duration=100µs-12 4837894 2482 ns/op 401.7 acquire-ns/op 380.9 release-ns/op 0.9725 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=0s-12 5403058 2188 ns/op 367.5 acquire-ns/op 273.1 release-ns/op 1.000 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=1µs-12 5086929 2306 ns/op 390.6 acquire-ns/op 376.3 release-ns/op 1.000 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=10µs-12 5059968 2378 ns/op 410.2 acquire-ns/op 384.5 release-ns/op 1.000 success-rate
|
||||||
|
BenchmarkSemaphore/semaphore_size=10000/lock_duration=100µs-12 4909206 2420 ns/op 408.4 acquire-ns/op 383.4 release-ns/op 1.000 success-rate
|
||||||
|
PASS
|
||||||
|
ok git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore 284.895s
|
95
limiting/semaphore/semaphore_bench_test.go
Normal file
95
limiting/semaphore/semaphore_bench_test.go
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
package semaphore_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
semaphores "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxWorkers = 10_000_000
|
||||||
|
|
||||||
|
type benchmarkSemaphoreMetrics struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
|
acquireDuration,
|
||||||
|
releaseDuration time.Duration
|
||||||
|
|
||||||
|
acquireCount,
|
||||||
|
releaseCount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *benchmarkSemaphoreMetrics) reportAcquire(duration time.Duration) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.acquireDuration += duration
|
||||||
|
c.acquireCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *benchmarkSemaphoreMetrics) reportRelease(duration time.Duration) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.releaseDuration += duration
|
||||||
|
c.releaseCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *benchmarkSemaphoreMetrics) getResults() (timePerAcquire, timePerRelease, successRate float64) {
|
||||||
|
timePerAcquire = float64(c.acquireDuration) / float64(c.acquireCount)
|
||||||
|
timePerRelease = float64(c.releaseDuration) / float64(c.releaseCount)
|
||||||
|
successRate = float64(c.releaseCount) / float64(c.acquireCount)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSemaphore(b *testing.B) {
|
||||||
|
sizes := []int64{1, 10, 100, 1000, 10000}
|
||||||
|
lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond}
|
||||||
|
|
||||||
|
for _, size := range sizes {
|
||||||
|
for _, lockDuration := range lockDurations {
|
||||||
|
name := fmt.Sprintf("semaphore_size=%d/lock_duration=%v", size, lockDuration)
|
||||||
|
b.Run(name, func(b *testing.B) {
|
||||||
|
benchmarkSemaphore(b, semaphores.NewSemaphore(size), lockDuration)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkSemaphore(b *testing.B, sem *semaphores.Semaphore, lockDuration time.Duration) {
|
||||||
|
var m benchmarkSemaphoreMetrics
|
||||||
|
var g errgroup.Group
|
||||||
|
g.SetLimit(maxWorkers)
|
||||||
|
|
||||||
|
for range b.N {
|
||||||
|
g.Go(func() error {
|
||||||
|
now := time.Now()
|
||||||
|
ok := sem.Acquire()
|
||||||
|
m.reportAcquire(time.Since(now))
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(lockDuration)
|
||||||
|
|
||||||
|
now = time.Now()
|
||||||
|
sem.Release()
|
||||||
|
m.reportRelease(time.Since(now))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(b, g.Wait())
|
||||||
|
|
||||||
|
require.Equal(b, uint64(b.N), m.acquireCount)
|
||||||
|
require.LessOrEqual(b, m.releaseCount, m.acquireCount)
|
||||||
|
|
||||||
|
timePerAcquire, timePerRelease, successRate := m.getResults()
|
||||||
|
|
||||||
|
b.ReportMetric(timePerAcquire, "acquire-ns/op")
|
||||||
|
b.ReportMetric(timePerRelease, "release-ns/op")
|
||||||
|
b.ReportMetric(successRate, "success-rate")
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue
This option is generally good, but I have doubts that using channels is faster than
atomic
s. Please provide another comparison (benchmark) with theatomic
-based implementation.@dstepanov-yadro wrote in #4 (comment):
#4 (comment)
That's why I suggested to introduce an interface and try different implementations to compare
@aarifullin
Limiter
as an interface. After discussion with @dstepanov-yadro, left onlyTryAcquire
method and renamed it toAcquire
semaphoreLimiter
. Made it generic, so it can use different semaphore implementation, i. e. built with atomics, channels and etc.@dstepanov-yadro