diff --git a/limiting/limiter.go b/limiting/limiter.go new file mode 100644 index 0000000..4d29a71 --- /dev/null +++ b/limiting/limiter.go @@ -0,0 +1,75 @@ +package limiting + +import ( + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" +) + +type ReleaseFunc func() + +type Limiter interface { + // Acquire attempts to reserve a slot without blocking. + // + // Returns a release function and true if successful. The function must be + // called to release the limiter. The function must be called exactly once. + // Calling the function more that once will cause incorrect behavior of the + // limiter. + // + // Returns nil and false if fails. + // + // If the key was not defined in the limiter, no limit is applied. + Acquire(key string) (ReleaseFunc, bool) +} + +type SemaphoreLimiter struct { + m map[string]*semaphore.Semaphore +} + +// KeyLimit defines a concurrency limit for a set of keys. +// +// All keys of one set share the same limit. +// Keys of different sets have separate limits. +// +// Sets must not overlap. +type KeyLimit struct { + Keys []string + Limit int64 +} + +func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) { + lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)} + for _, limit := range limits { + if limit.Limit < 0 { + return nil, fmt.Errorf("invalid limit %d", limit.Limit) + } + sem := semaphore.NewSemaphore(limit.Limit) + + if err := lr.addLimit(&limit, sem); err != nil { + return nil, err + } + } + return &lr, nil +} + +func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error { + for _, key := range limit.Keys { + if _, exists := lr.m[key]; exists { + return fmt.Errorf("duplicate key %q", key) + } + lr.m[key] = sem + } + return nil +} + +func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) { + sem, ok := lr.m[key] + if !ok { + return func() {}, true + } + + if ok := sem.Acquire(); ok { + return func() { sem.Release() }, true + } + return nil, false +} diff --git a/limiting/limiter_test.go b/limiting/limiter_test.go new file mode 100644 index 0000000..c6087f1 --- /dev/null +++ b/limiting/limiter_test.go @@ -0,0 +1,138 @@ +package limiting_test + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" + "github.com/stretchr/testify/require" +) + +const ( + operationDuration = 10 * time.Millisecond + operationCount = 64 +) + +type testCase struct { + keys []string + limit int64 + withoutLimit bool + failCount atomic.Int64 +} + +func TestLimiter(t *testing.T) { + testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) { + return limiting.NewSemaphoreLimiter(kl) + }) +} + +func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) { + t.Run("duplicate key", func(t *testing.T) { + _, err := getLimiter([]limiting.KeyLimit{ + {[]string{"A", "B"}, 10}, + {[]string{"B", "C"}, 10}, + }) + require.Error(t, err) + }) + + testCases := []*testCase{ + {keys: []string{"A"}, limit: operationCount / 4}, + {keys: []string{"B"}, limit: operationCount / 2}, + {keys: []string{"C", "D"}, limit: operationCount / 4}, + {keys: []string{"E"}, limit: 2 * operationCount}, + {keys: []string{"F"}, withoutLimit: true}, + } + + lr, err := getLimiter(getLimits(testCases)) + require.NoError(t, err) + + tasks := createTestTasks(testCases, lr) + + t.Run("first run", func(t *testing.T) { + executeTasks(tasks...) + verifyResults(t, testCases) + }) + + resetFailCounts(testCases) + + t.Run("repeated run", func(t *testing.T) { + executeTasks(tasks...) + verifyResults(t, testCases) + }) +} + +func getLimits(testCases []*testCase) []limiting.KeyLimit { + var limits []limiting.KeyLimit + for _, tc := range testCases { + if tc.withoutLimit { + continue + } + limits = append(limits, limiting.KeyLimit{ + Keys: tc.keys, + Limit: int64(tc.limit), + }) + } + return limits +} + +func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() { + var tasks []func() + for _, tc := range testCases { + for _, key := range tc.keys { + tasks = append(tasks, func() { + executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) }) + }) + } + } + return tasks +} + +func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) { + release, ok := lr.Acquire(key) + if !ok { + tc.failCount.Add(1) + return + } + defer release() + time.Sleep(operationDuration) +} + +func executeTasks(tasks ...func()) { + var g sync.WaitGroup + + g.Add(len(tasks)) + for _, task := range tasks { + go func() { + defer g.Done() + task() + }() + } + g.Wait() +} + +func executeTaskN(N int, task func()) { + tasks := make([]func(), N) + for i := range N { + tasks[i] = task + } + executeTasks(tasks...) +} + +func verifyResults(t *testing.T, testCases []*testCase) { + for _, tc := range testCases { + var expectedFailCount int64 + if !tc.withoutLimit { + numKeys := int64(len(tc.keys)) + expectedFailCount = max(operationCount*numKeys-tc.limit, 0) + } + require.Equal(t, expectedFailCount, tc.failCount.Load()) + } +} + +func resetFailCounts(testCases []*testCase) { + for _, tc := range testCases { + tc.failCount.Store(0) + } +} diff --git a/limiting/semaphore/semaphore.go b/limiting/semaphore/semaphore.go new file mode 100644 index 0000000..c43dfc6 --- /dev/null +++ b/limiting/semaphore/semaphore.go @@ -0,0 +1,27 @@ +package semaphore + +import ( + "sync/atomic" +) + +type Semaphore struct { + count atomic.Int64 + limit int64 +} + +func NewSemaphore(size int64) *Semaphore { + return &Semaphore{limit: size} +} + +func (s *Semaphore) Acquire() bool { + v := s.count.Add(1) + if v > s.limit { + s.count.Add(-1) + return false + } + return true +} + +func (s *Semaphore) Release() { + s.count.Add(-1) +} diff --git a/limiting/semaphore/semaphore_bench.result b/limiting/semaphore/semaphore_bench.result new file mode 100644 index 0000000..5883c60 --- /dev/null +++ b/limiting/semaphore/semaphore_bench.result @@ -0,0 +1,26 @@ +goos: linux +goarch: amd64 +pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore +cpu: 12th Gen Intel(R) Core(TM) i5-1235U +BenchmarkSemaphore/semaphore_size=1/lock_duration=0s-12 6113605 1964 ns/op 383.1 acquire-ns/op 203.5 release-ns/op 0.6892 success-rate +BenchmarkSemaphore/semaphore_size=1/lock_duration=1µs-12 5826655 2067 ns/op 382.0 acquire-ns/op 307.0 release-ns/op 0.1460 success-rate +BenchmarkSemaphore/semaphore_size=1/lock_duration=10µs-12 5977272 2033 ns/op 370.4 acquire-ns/op 321.4 release-ns/op 0.05408 success-rate +BenchmarkSemaphore/semaphore_size=1/lock_duration=100µs-12 5862900 2030 ns/op 365.6 acquire-ns/op 343.1 release-ns/op 0.01242 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=0s-12 5637050 2173 ns/op 365.2 acquire-ns/op 261.7 release-ns/op 0.9765 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=1µs-12 5470316 2225 ns/op 390.4 acquire-ns/op 357.2 release-ns/op 0.9249 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=10µs-12 5584527 2134 ns/op 395.2 acquire-ns/op 339.0 release-ns/op 0.5409 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=100µs-12 5841032 2036 ns/op 369.4 acquire-ns/op 330.7 release-ns/op 0.1182 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=0s-12 5600013 2159 ns/op 369.9 acquire-ns/op 271.1 release-ns/op 0.9976 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=1µs-12 5323606 2280 ns/op 394.0 acquire-ns/op 368.9 release-ns/op 0.9697 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=10µs-12 5133394 2353 ns/op 405.8 acquire-ns/op 374.5 release-ns/op 0.9498 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=100µs-12 5238136 2303 ns/op 387.2 acquire-ns/op 362.2 release-ns/op 0.8749 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=0s-12 5408720 2180 ns/op 367.6 acquire-ns/op 271.5 release-ns/op 0.9992 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=1µs-12 5114854 2366 ns/op 407.9 acquire-ns/op 376.4 release-ns/op 0.9966 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=10µs-12 4659454 2438 ns/op 412.2 acquire-ns/op 385.9 release-ns/op 0.9800 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=100µs-12 4837894 2482 ns/op 401.7 acquire-ns/op 380.9 release-ns/op 0.9725 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=0s-12 5403058 2188 ns/op 367.5 acquire-ns/op 273.1 release-ns/op 1.000 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=1µs-12 5086929 2306 ns/op 390.6 acquire-ns/op 376.3 release-ns/op 1.000 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=10µs-12 5059968 2378 ns/op 410.2 acquire-ns/op 384.5 release-ns/op 1.000 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=100µs-12 4909206 2420 ns/op 408.4 acquire-ns/op 383.4 release-ns/op 1.000 success-rate +PASS +ok git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore 284.895s diff --git a/limiting/semaphore/semaphore_bench_test.go b/limiting/semaphore/semaphore_bench_test.go new file mode 100644 index 0000000..f4837e8 --- /dev/null +++ b/limiting/semaphore/semaphore_bench_test.go @@ -0,0 +1,95 @@ +package semaphore_test + +import ( + "fmt" + "sync" + "testing" + "time" + + semaphores "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +const maxWorkers = 10_000_000 + +type benchmarkSemaphoreMetrics struct { + mu sync.Mutex + + acquireDuration, + releaseDuration time.Duration + + acquireCount, + releaseCount uint64 +} + +func (c *benchmarkSemaphoreMetrics) reportAcquire(duration time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.acquireDuration += duration + c.acquireCount += 1 +} + +func (c *benchmarkSemaphoreMetrics) reportRelease(duration time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.releaseDuration += duration + c.releaseCount += 1 +} + +func (c *benchmarkSemaphoreMetrics) getResults() (timePerAcquire, timePerRelease, successRate float64) { + timePerAcquire = float64(c.acquireDuration) / float64(c.acquireCount) + timePerRelease = float64(c.releaseDuration) / float64(c.releaseCount) + successRate = float64(c.releaseCount) / float64(c.acquireCount) + return +} + +func BenchmarkSemaphore(b *testing.B) { + sizes := []int64{1, 10, 100, 1000, 10000} + lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond} + + for _, size := range sizes { + for _, lockDuration := range lockDurations { + name := fmt.Sprintf("semaphore_size=%d/lock_duration=%v", size, lockDuration) + b.Run(name, func(b *testing.B) { + benchmarkSemaphore(b, semaphores.NewSemaphore(size), lockDuration) + }) + } + } +} + +func benchmarkSemaphore(b *testing.B, sem *semaphores.Semaphore, lockDuration time.Duration) { + var m benchmarkSemaphoreMetrics + var g errgroup.Group + g.SetLimit(maxWorkers) + + for range b.N { + g.Go(func() error { + now := time.Now() + ok := sem.Acquire() + m.reportAcquire(time.Since(now)) + + if !ok { + return nil + } + + time.Sleep(lockDuration) + + now = time.Now() + sem.Release() + m.reportRelease(time.Since(now)) + + return nil + }) + } + require.NoError(b, g.Wait()) + + require.Equal(b, uint64(b.N), m.acquireCount) + require.LessOrEqual(b, m.releaseCount, m.acquireCount) + + timePerAcquire, timePerRelease, successRate := m.getResults() + + b.ReportMetric(timePerAcquire, "acquire-ns/op") + b.ReportMetric(timePerRelease, "release-ns/op") + b.ReportMetric(successRate, "success-rate") +}