[#4] limiting: Add Limiter
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
cfbca7fa1d
commit
240501c1b7
2 changed files with 244 additions and 0 deletions
99
limiting/limiter.go
Normal file
99
limiting/limiter.go
Normal file
|
@ -0,0 +1,99 @@
|
|||
package limiting
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type semaphore struct {
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
func newSemaphore(size uint64) *semaphore {
|
||||
return &semaphore{make(chan struct{}, size)}
|
||||
}
|
||||
|
||||
func (s *semaphore) acquire(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case s.ch <- struct{}{}:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *semaphore) tryAcquire() bool {
|
||||
select {
|
||||
case s.ch <- struct{}{}:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *semaphore) release() {
|
||||
<-s.ch
|
||||
}
|
||||
|
||||
type Limiter struct {
|
||||
m map[string]*semaphore // for read-only access
|
||||
}
|
||||
|
||||
// KeyLimit defines a concurrency limit for a set of keys.
|
||||
//
|
||||
// All keys of one set share the same limit.
|
||||
// Keys of different sets have separate limits.
|
||||
//
|
||||
// Sets must not overlap.
|
||||
type KeyLimit struct {
|
||||
Keys []string
|
||||
Limit uint64
|
||||
}
|
||||
|
||||
type ReleaseFunc func()
|
||||
|
||||
func New(limits []KeyLimit) *Limiter {
|
||||
lr := Limiter{m: make(map[string]*semaphore)}
|
||||
for _, l := range limits {
|
||||
sem := newSemaphore(l.Limit)
|
||||
for _, key := range l.Keys {
|
||||
lr.m[key] = sem
|
||||
}
|
||||
}
|
||||
return &lr
|
||||
}
|
||||
|
||||
// Acquire reserves a slot for the given key, blocking if necessary.
|
||||
//
|
||||
// If the context is canceled before reservation, returns an error;
|
||||
// otherwise, returns a release function that must be called exactly once.
|
||||
//
|
||||
// If the key was not defined in the limiter, no limit is applied.
|
||||
func (lr *Limiter) Acquire(ctx context.Context, key string) (ReleaseFunc, error) {
|
||||
sem, ok := lr.m[key]
|
||||
if !ok {
|
||||
return func() {}, nil
|
||||
}
|
||||
|
||||
if err := sem.acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return func() { sem.release() }, nil
|
||||
}
|
||||
|
||||
// TryAcquire attempts to reserve a slot without blocking.
|
||||
//
|
||||
// Returns a release function and true if successful, otherwise false.
|
||||
// The release function must be called exactly once.
|
||||
//
|
||||
// If the key was not defined in the limiter, no limit is applied.
|
||||
func (lr *Limiter) TryAcquire(key string) (ReleaseFunc, bool) {
|
||||
sem, ok := lr.m[key]
|
||||
if !ok {
|
||||
return func() {}, true
|
||||
}
|
||||
|
||||
if ok := sem.tryAcquire(); ok {
|
||||
return func() { sem.release() }, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue