diff --git a/limiting/limiter.go b/limiting/limiter.go new file mode 100644 index 0000000..4d29a71 --- /dev/null +++ b/limiting/limiter.go @@ -0,0 +1,75 @@ +package limiting + +import ( + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" +) + +type ReleaseFunc func() + +type Limiter interface { + // Acquire attempts to reserve a slot without blocking. + // + // Returns a release function and true if successful. The function must be + // called to release the limiter. The function must be called exactly once. + // Calling the function more that once will cause incorrect behavior of the + // limiter. + // + // Returns nil and false if fails. + // + // If the key was not defined in the limiter, no limit is applied. + Acquire(key string) (ReleaseFunc, bool) +} + +type SemaphoreLimiter struct { + m map[string]*semaphore.Semaphore +} + +// KeyLimit defines a concurrency limit for a set of keys. +// +// All keys of one set share the same limit. +// Keys of different sets have separate limits. +// +// Sets must not overlap. +type KeyLimit struct { + Keys []string + Limit int64 +} + +func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) { + lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)} + for _, limit := range limits { + if limit.Limit < 0 { + return nil, fmt.Errorf("invalid limit %d", limit.Limit) + } + sem := semaphore.NewSemaphore(limit.Limit) + + if err := lr.addLimit(&limit, sem); err != nil { + return nil, err + } + } + return &lr, nil +} + +func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error { + for _, key := range limit.Keys { + if _, exists := lr.m[key]; exists { + return fmt.Errorf("duplicate key %q", key) + } + lr.m[key] = sem + } + return nil +} + +func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) { + sem, ok := lr.m[key] + if !ok { + return func() {}, true + } + + if ok := sem.Acquire(); ok { + return func() { sem.Release() }, true + } + return nil, false +} diff --git a/limiting/limiter_test.go b/limiting/limiter_test.go new file mode 100644 index 0000000..c6087f1 --- /dev/null +++ b/limiting/limiter_test.go @@ -0,0 +1,138 @@ +package limiting_test + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" + "github.com/stretchr/testify/require" +) + +const ( + operationDuration = 10 * time.Millisecond + operationCount = 64 +) + +type testCase struct { + keys []string + limit int64 + withoutLimit bool + failCount atomic.Int64 +} + +func TestLimiter(t *testing.T) { + testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) { + return limiting.NewSemaphoreLimiter(kl) + }) +} + +func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) { + t.Run("duplicate key", func(t *testing.T) { + _, err := getLimiter([]limiting.KeyLimit{ + {[]string{"A", "B"}, 10}, + {[]string{"B", "C"}, 10}, + }) + require.Error(t, err) + }) + + testCases := []*testCase{ + {keys: []string{"A"}, limit: operationCount / 4}, + {keys: []string{"B"}, limit: operationCount / 2}, + {keys: []string{"C", "D"}, limit: operationCount / 4}, + {keys: []string{"E"}, limit: 2 * operationCount}, + {keys: []string{"F"}, withoutLimit: true}, + } + + lr, err := getLimiter(getLimits(testCases)) + require.NoError(t, err) + + tasks := createTestTasks(testCases, lr) + + t.Run("first run", func(t *testing.T) { + executeTasks(tasks...) + verifyResults(t, testCases) + }) + + resetFailCounts(testCases) + + t.Run("repeated run", func(t *testing.T) { + executeTasks(tasks...) + verifyResults(t, testCases) + }) +} + +func getLimits(testCases []*testCase) []limiting.KeyLimit { + var limits []limiting.KeyLimit + for _, tc := range testCases { + if tc.withoutLimit { + continue + } + limits = append(limits, limiting.KeyLimit{ + Keys: tc.keys, + Limit: int64(tc.limit), + }) + } + return limits +} + +func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() { + var tasks []func() + for _, tc := range testCases { + for _, key := range tc.keys { + tasks = append(tasks, func() { + executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) }) + }) + } + } + return tasks +} + +func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) { + release, ok := lr.Acquire(key) + if !ok { + tc.failCount.Add(1) + return + } + defer release() + time.Sleep(operationDuration) +} + +func executeTasks(tasks ...func()) { + var g sync.WaitGroup + + g.Add(len(tasks)) + for _, task := range tasks { + go func() { + defer g.Done() + task() + }() + } + g.Wait() +} + +func executeTaskN(N int, task func()) { + tasks := make([]func(), N) + for i := range N { + tasks[i] = task + } + executeTasks(tasks...) +} + +func verifyResults(t *testing.T, testCases []*testCase) { + for _, tc := range testCases { + var expectedFailCount int64 + if !tc.withoutLimit { + numKeys := int64(len(tc.keys)) + expectedFailCount = max(operationCount*numKeys-tc.limit, 0) + } + require.Equal(t, expectedFailCount, tc.failCount.Load()) + } +} + +func resetFailCounts(testCases []*testCase) { + for _, tc := range testCases { + tc.failCount.Store(0) + } +} diff --git a/limiting/semaphore/semaphore.go b/limiting/semaphore/semaphore.go new file mode 100644 index 0000000..c43dfc6 --- /dev/null +++ b/limiting/semaphore/semaphore.go @@ -0,0 +1,27 @@ +package semaphore + +import ( + "sync/atomic" +) + +type Semaphore struct { + count atomic.Int64 + limit int64 +} + +func NewSemaphore(size int64) *Semaphore { + return &Semaphore{limit: size} +} + +func (s *Semaphore) Acquire() bool { + v := s.count.Add(1) + if v > s.limit { + s.count.Add(-1) + return false + } + return true +} + +func (s *Semaphore) Release() { + s.count.Add(-1) +}