From a37e3dc38103d148bbfb1a049419487ffc029870 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Wed, 12 Feb 2025 23:05:41 +0300 Subject: [PATCH] [#4] limiting: Add benchmarks Signed-off-by: Aleksey Savchuk --- limiting/limiter_bench_test.go | 172 +++++++++++++++++++++++++++++++++ limiting/semaphore.go | 13 +++ 2 files changed, 185 insertions(+) create mode 100644 limiting/limiter_bench_test.go diff --git a/limiting/limiter_bench_test.go b/limiting/limiter_bench_test.go new file mode 100644 index 0000000..54e4beb --- /dev/null +++ b/limiting/limiter_bench_test.go @@ -0,0 +1,172 @@ +package limiting + +import ( + "fmt" + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +const maxWorkers = 10_000_000 + +type benchmarkBurstAtomicSemaphoreMetrics struct { + mu sync.Mutex + numLimit, + accLimit, + maxLimit uint64 +} + +func (c *benchmarkBurstAtomicSemaphoreMetrics) reportLimit(limit uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.numLimit += 1 + c.accLimit += limit + c.maxLimit = max(c.maxLimit, limit) +} + +func (c *benchmarkBurstAtomicSemaphoreMetrics) getResults() (avgLimit, maxLimit float64) { + if c.numLimit == 0 { + return math.NaN(), math.NaN() + } + + avgLimit = float64(c.accLimit) / float64(c.numLimit) + maxLimit = float64(c.maxLimit) + return +} + +func BenchmarkBurstAtomicSemaphore(b *testing.B) { + sizes := []int64{1, 10, 100, 1000, 10000} + lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond} + + for _, size := range sizes { + for _, lockDuration := range lockDurations { + name := fmt.Sprintf("size=%d/duration=%v", size, lockDuration) + b.Run(name, func(b *testing.B) { + benchmarkBurstAtomicSemaphore(b, size, lockDuration) + }) + } + } +} + +func benchmarkBurstAtomicSemaphore(b *testing.B, size int64, lockDuration time.Duration) { + sem := newBurstAtomicSemaphore(size) + + var m benchmarkBurstAtomicSemaphoreMetrics + var g errgroup.Group + g.SetLimit(maxWorkers) + + for range b.N { + g.Go(func() error { + ok, limit := sem.acquireWithReturnLimit() + if !ok { + m.reportLimit(uint64(limit)) + return nil + } + time.Sleep(lockDuration) + sem.release() + + return nil + }) + } + require.NoError(b, g.Wait()) + + avgLimit, maxLimit := m.getResults() + b.ReportMetric(avgLimit, "avg-limit") + b.ReportMetric(maxLimit, "max-limit") +} + +type benchmarkSemaphoreMetrics struct { + mu sync.Mutex + + acquireDuration, + releaseDuration time.Duration + + acquireCount, + releaseCount uint64 +} + +func (c *benchmarkSemaphoreMetrics) reportAcquire(duration time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.acquireDuration += duration + c.acquireCount += 1 +} + +func (c *benchmarkSemaphoreMetrics) reportRelease(duration time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.releaseDuration += duration + c.releaseCount += 1 +} + +func (c *benchmarkSemaphoreMetrics) getResults() (timePerAcquire, timePerRelease, successRate float64) { + timePerAcquire = float64(c.acquireDuration) / float64(c.acquireCount) + timePerRelease = float64(c.releaseDuration) / float64(c.releaseCount) + successRate = float64(c.releaseCount) / float64(c.acquireCount) + return +} + +func BenchmarkSemaphore(b *testing.B) { + sizes := []int64{1, 10, 100, 1000, 10000} + lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond} + + // sizes := []int64{1} + // lockDurations := []time.Duration{time.Microsecond} + + for _, size := range sizes { + for _, lockDuration := range lockDurations { + name := fmt.Sprintf("size=%d/duration=%v", size, lockDuration) + b.Run(name, func(b *testing.B) { + b.Run("impl=channel", func(b *testing.B) { + benchmarkSemaphore(b, newChannelSemaphore(size), lockDuration) + }) + b.Run("impl=atomic", func(b *testing.B) { + benchmarkSemaphore(b, newAtomicSemaphore(size), lockDuration) + }) + b.Run("impl=burst_atomic", func(b *testing.B) { + benchmarkSemaphore(b, newBurstAtomicSemaphore(size), lockDuration) + }) + }) + } + } +} + +func benchmarkSemaphore(b *testing.B, sem semaphore, lockDuration time.Duration) { + var m benchmarkSemaphoreMetrics + var g errgroup.Group + g.SetLimit(maxWorkers) + + for range b.N { + g.Go(func() error { + now := time.Now() + ok := sem.acquire() + m.reportAcquire(time.Since(now)) + + if !ok { + return nil + } + + time.Sleep(lockDuration) + + now = time.Now() + sem.release() + m.reportRelease(time.Since(now)) + + return nil + }) + } + require.NoError(b, g.Wait()) + + require.Equal(b, uint64(b.N), m.acquireCount) + require.LessOrEqual(b, m.releaseCount, m.acquireCount) + + timePerAcquire, timePerRelease, successRate := m.getResults() + + b.ReportMetric(timePerAcquire, "acquire-ns/op") + b.ReportMetric(timePerRelease, "release-ns/op") + b.ReportMetric(successRate, "success-rate") +} diff --git a/limiting/semaphore.go b/limiting/semaphore.go index 91a32f0..66d0920 100644 --- a/limiting/semaphore.go +++ b/limiting/semaphore.go @@ -52,6 +52,19 @@ func (s *burstAtomicSemaphore) release() { s.count.Add(-1) } +// acquireWithReturnLimit calls [acquire] and returns current limit if failed. +// This is used for test purposes only. +// +//nolint:unused +func (s *burstAtomicSemaphore) acquireWithReturnLimit() (bool, int64) { + v := s.count.Add(1) + if v > s.limit { + s.count.Add(-1) + return false, v - 1 + } + return true, v +} + type channelSemaphore struct { ch chan struct{} }