package writecache import ( "errors" "sync" ) var errLimiterClosed = errors.New("acquire failed: limiter closed") // flushLimiter is used to limit the total size of objects // being flushed to blobstore at the same time. This is a necessary // limitation so that the flushing process does not have // a strong impact on user requests. type flushLimiter struct { count, size uint64 maxSize uint64 cond *sync.Cond closed bool } func newFlushLimiter(maxSize uint64) *flushLimiter { return &flushLimiter{ maxSize: maxSize, cond: sync.NewCond(&sync.Mutex{}), } } func (l *flushLimiter) acquire(size uint64) error { l.cond.L.Lock() defer l.cond.L.Unlock() // it is allowed to overflow maxSize to allow flushing objects with size > maxSize for l.count > 0 && l.size+size > l.maxSize && !l.closed { l.cond.Wait() if l.closed { return errLimiterClosed } } l.count++ l.size += size return nil } func (l *flushLimiter) release(size uint64) { l.cond.L.Lock() defer l.cond.L.Unlock() if l.size >= size { l.size -= size } else { panic("flushLimiter: invalid size") } if l.count > 0 { l.count-- } else { panic("flushLimiter: invalid count") } l.cond.Broadcast() } func (l *flushLimiter) close() { l.cond.L.Lock() defer l.cond.L.Unlock() l.closed = true l.cond.Broadcast() }