frostfs-node/pkg/local_object_storage/writecache/limiter.go
Ekaterina Lebedeva fc6abe30b8 [#1693] storage: Replace conditional panics with asserts
Change-Id: I9d8ccde3c71fca716856c7bfc53da20ee0542f20
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-15 16:54:31 +00:00

66 lines
1.3 KiB
Go

package writecache
import (
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
)
var errLimiterClosed = errors.New("acquire failed: limiter closed")
// flushLimiter is used to limit the total size of objects
// being flushed to blobstore at the same time. This is a necessary
// limitation so that the flushing process does not have
// a strong impact on user requests.
type flushLimiter struct {
count, size uint64
maxSize uint64
cond *sync.Cond
closed bool
}
func newFlushLimiter(maxSize uint64) *flushLimiter {
return &flushLimiter{
maxSize: maxSize,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (l *flushLimiter) acquire(size uint64) error {
l.cond.L.Lock()
defer l.cond.L.Unlock()
// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
for l.count > 0 && l.size+size > l.maxSize && !l.closed {
l.cond.Wait()
if l.closed {
return errLimiterClosed
}
}
l.count++
l.size += size
return nil
}
func (l *flushLimiter) release(size uint64) {
l.cond.L.Lock()
defer l.cond.L.Unlock()
assert.True(l.size >= size, "flushLimiter: invalid size")
l.size -= size
assert.True(l.count > 0, "flushLimiter: invalid count")
l.count--
l.cond.Broadcast()
}
func (l *flushLimiter) close() {
l.cond.L.Lock()
defer l.cond.L.Unlock()
l.closed = true
l.cond.Broadcast()
}