71 lines
1.3 KiB
Go
71 lines
1.3 KiB
Go
|
package writecache
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
var errLimiterClosed = errors.New("acquire failed: limiter closed")
|
||
|
|
||
|
// flushLimiter is used to limit the total size of objects
|
||
|
// being flushed to blobstore at the same time. This is a necessary
|
||
|
// limitation so that the flushing process does not have
|
||
|
// a strong impact on user requests.
|
||
|
type flushLimiter struct {
|
||
|
count, size uint64
|
||
|
maxSize uint64
|
||
|
cond *sync.Cond
|
||
|
closed bool
|
||
|
}
|
||
|
|
||
|
func newFlushLimiter(maxSize uint64) *flushLimiter {
|
||
|
return &flushLimiter{
|
||
|
maxSize: maxSize,
|
||
|
cond: sync.NewCond(&sync.Mutex{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (l *flushLimiter) acquire(size uint64) error {
|
||
|
l.cond.L.Lock()
|
||
|
defer l.cond.L.Unlock()
|
||
|
|
||
|
// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
|
||
|
for l.count > 0 && l.size+size > l.maxSize && !l.closed {
|
||
|
l.cond.Wait()
|
||
|
if l.closed {
|
||
|
return errLimiterClosed
|
||
|
}
|
||
|
}
|
||
|
l.count++
|
||
|
l.size += size
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (l *flushLimiter) release(size uint64) {
|
||
|
l.cond.L.Lock()
|
||
|
defer l.cond.L.Unlock()
|
||
|
|
||
|
if l.size >= size {
|
||
|
l.size -= size
|
||
|
} else {
|
||
|
panic("flushLimiter: invalid size")
|
||
|
}
|
||
|
|
||
|
if l.count > 0 {
|
||
|
l.count--
|
||
|
} else {
|
||
|
panic("flushLimiter: invalid count")
|
||
|
}
|
||
|
|
||
|
l.cond.Broadcast()
|
||
|
}
|
||
|
|
||
|
func (l *flushLimiter) close() {
|
||
|
l.cond.L.Lock()
|
||
|
defer l.cond.L.Unlock()
|
||
|
|
||
|
l.closed = true
|
||
|
|
||
|
l.cond.Broadcast()
|
||
|
}
|