[#1367] writecache: Add background flushing objects limiter
To limit memory usage by background flush. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
8a6e3025a0
commit
e39378b1c3
12 changed files with 184 additions and 35 deletions
70
pkg/local_object_storage/writecache/limiter.go
Normal file
70
pkg/local_object_storage/writecache/limiter.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var errLimiterClosed = errors.New("acquire failed: limiter closed")
|
||||
|
||||
// flushLimiter is used to limit the total size of objects
|
||||
// being flushed to blobstore at the same time. This is a necessary
|
||||
// limitation so that the flushing process does not have
|
||||
// a strong impact on user requests.
|
||||
type flushLimiter struct {
|
||||
count, size uint64
|
||||
maxSize uint64
|
||||
cond *sync.Cond
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newFlushLimiter(maxSize uint64) *flushLimiter {
|
||||
return &flushLimiter{
|
||||
maxSize: maxSize,
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *flushLimiter) acquire(size uint64) error {
|
||||
l.cond.L.Lock()
|
||||
defer l.cond.L.Unlock()
|
||||
|
||||
// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
|
||||
for l.count > 0 && l.size+size > l.maxSize && !l.closed {
|
||||
l.cond.Wait()
|
||||
if l.closed {
|
||||
return errLimiterClosed
|
||||
}
|
||||
}
|
||||
l.count++
|
||||
l.size += size
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *flushLimiter) release(size uint64) {
|
||||
l.cond.L.Lock()
|
||||
defer l.cond.L.Unlock()
|
||||
|
||||
if l.size >= size {
|
||||
l.size -= size
|
||||
} else {
|
||||
panic("flushLimiter: invalid size")
|
||||
}
|
||||
|
||||
if l.count > 0 {
|
||||
l.count--
|
||||
} else {
|
||||
panic("flushLimiter: invalid count")
|
||||
}
|
||||
|
||||
l.cond.Broadcast()
|
||||
}
|
||||
|
||||
func (l *flushLimiter) close() {
|
||||
l.cond.L.Lock()
|
||||
defer l.cond.L.Unlock()
|
||||
|
||||
l.closed = true
|
||||
|
||||
l.cond.Broadcast()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue