package limiting import ( "context" ) type semaphore struct { ch chan struct{} } func newSemaphore(size uint64) *semaphore { return &semaphore{make(chan struct{}, size)} } func (s *semaphore) acquire(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() case s.ch <- struct{}{}: return nil } } func (s *semaphore) tryAcquire() bool { select { case s.ch <- struct{}{}: return true default: return false } } func (s *semaphore) release() { <-s.ch } type Limiter struct { m map[string]*semaphore } // KeyLimit defines a concurrency limit for a set of keys. // // All keys of one set share the same limit. // Keys of different sets have separate limits. // // Sets must not overlap. type KeyLimit struct { Keys []string Limit uint64 } type ReleaseFunc func() func New(limits []KeyLimit) *Limiter { lr := Limiter{m: make(map[string]*semaphore)} for _, l := range limits { sem := newSemaphore(l.Limit) for _, key := range l.Keys { lr.m[key] = sem } } return &lr } // Acquire reserves a slot for the given key, blocking if necessary. // // If the context is canceled before reservation, returns an error; // otherwise, returns a release function that must be called exactly once. // // If the key was not defined in the limiter, no limit is applied. func (lr *Limiter) Acquire(ctx context.Context, key string) (ReleaseFunc, error) { sem, ok := lr.m[key] if !ok { return func() {}, nil } if err := sem.acquire(ctx); err != nil { return nil, err } return func() { sem.release() }, nil } // TryAcquire attempts to reserve a slot without blocking. // // Returns a release function and true if successful, otherwise false. // The release function must be called exactly once. // // If the key was not defined in the limiter, no limit is applied. func (lr *Limiter) TryAcquire(key string) (ReleaseFunc, bool) { sem, ok := lr.m[key] if !ok { return func() {}, true } if ok := sem.tryAcquire(); ok { return func() { sem.release() }, true } return nil, false }