[#9999] writecache: Count real data size
The size of the data on the disk is used to determine the writeache size, but not the number of objects multiplied by the maximum allowed object size. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
bc6653afb5
commit
6e92cf68e3
10 changed files with 114 additions and 93 deletions
|
@ -1,22 +1,23 @@
|
|||
package fstree
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
|
||||
type FileCounter interface {
|
||||
Set(v uint64)
|
||||
Inc()
|
||||
Dec()
|
||||
Set(count, size int64)
|
||||
Inc(size int64)
|
||||
Dec(size int64)
|
||||
Value() (int64, int64)
|
||||
}
|
||||
|
||||
type noopCounter struct{}
|
||||
|
||||
func (c *noopCounter) Set(uint64) {}
|
||||
func (c *noopCounter) Inc() {}
|
||||
func (c *noopCounter) Dec() {}
|
||||
func (c *noopCounter) Set(int64, int64) {}
|
||||
func (c *noopCounter) Inc(int64) {}
|
||||
func (c *noopCounter) Dec(int64) {}
|
||||
func (c *noopCounter) Value() (int64, int64) { return 0, 0 }
|
||||
|
||||
func counterEnabled(c FileCounter) bool {
|
||||
_, noop := c.(*noopCounter)
|
||||
|
@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool {
|
|||
}
|
||||
|
||||
type SimpleCounter struct {
|
||||
v atomic.Uint64
|
||||
count atomic.Int64
|
||||
size atomic.Int64
|
||||
}
|
||||
|
||||
func NewSimpleCounter() *SimpleCounter {
|
||||
return &SimpleCounter{}
|
||||
}
|
||||
|
||||
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
|
||||
func (c *SimpleCounter) Inc() { c.v.Add(1) }
|
||||
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
|
||||
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }
|
||||
func (c *SimpleCounter) Set(count, size int64) {
|
||||
c.count.Store(count)
|
||||
c.size.Store(size)
|
||||
}
|
||||
|
||||
func (c *SimpleCounter) Inc(size int64) {
|
||||
c.count.Add(1)
|
||||
c.size.Add(size)
|
||||
}
|
||||
|
||||
func (c *SimpleCounter) Dec(size int64) {
|
||||
c.count.Add(-1)
|
||||
c.size.Add(-size)
|
||||
}
|
||||
|
||||
func (c *SimpleCounter) Value() (int64, int64) {
|
||||
return c.count.Load(), c.size.Load()
|
||||
}
|
||||
|
|
|
@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
counter, err := t.countFiles()
|
||||
count, size, err := t.countFiles()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.fileCounter.Set(counter)
|
||||
t.fileCounter.Set(count, size)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *FSTree) countFiles() (uint64, error) {
|
||||
var counter uint64
|
||||
func (t *FSTree) countFiles() (int64, int64, error) {
|
||||
var count int64
|
||||
var size int64
|
||||
// it is simpler to just consider every file
|
||||
// that is not directory as an object
|
||||
err := filepath.WalkDir(t.RootPath,
|
||||
func(_ string, d fs.DirEntry, _ error) error {
|
||||
if !d.IsDir() {
|
||||
counter++
|
||||
count++
|
||||
fi, err := d.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size += fi.Size()
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||
}
|
||||
|
||||
return counter, nil
|
||||
return count, size, nil
|
||||
}
|
||||
|
||||
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
|
|
|
@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) {
|
|||
require.NoError(t, fst.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, fst.Init())
|
||||
|
||||
counterValue := counter.Value()
|
||||
require.Equal(t, uint64(0), counterValue)
|
||||
counterValue, sizeValue := counter.Value()
|
||||
require.Equal(t, int64(0), counterValue)
|
||||
require.Equal(t, int64(0), sizeValue)
|
||||
|
||||
defer func() {
|
||||
require.NoError(t, fst.Close())
|
||||
|
@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) {
|
|||
putPrm.Address = addr
|
||||
putPrm.RawData, _ = obj.Marshal()
|
||||
|
||||
var getPrm common.GetPrm
|
||||
getPrm.Address = putPrm.Address
|
||||
|
||||
var delPrm common.DeletePrm
|
||||
delPrm.Address = addr
|
||||
|
||||
|
@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) {
|
|||
|
||||
require.NoError(t, eg.Wait())
|
||||
|
||||
counterValue = counter.Value()
|
||||
realCount, err := fst.countFiles()
|
||||
counterValue, sizeValue = counter.Value()
|
||||
realCount, realSize, err := fst.countFiles()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, realCount, counterValue)
|
||||
require.Equal(t, realSize, sizeValue)
|
||||
}
|
||||
|
|
|
@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
|
|||
}
|
||||
|
||||
if w.fileCounterEnabled {
|
||||
w.fileCounter.Inc()
|
||||
w.fileCounter.Inc(int64(len(data)))
|
||||
var targetFileExists bool
|
||||
if _, e := os.Stat(p); e == nil {
|
||||
s, e := os.Stat(p)
|
||||
if e == nil {
|
||||
targetFileExists = true
|
||||
}
|
||||
err = os.Rename(tmpPath, p)
|
||||
if err == nil && targetFileExists {
|
||||
w.fileCounter.Dec()
|
||||
w.fileCounter.Dec(int64(s.Size()))
|
||||
}
|
||||
} else {
|
||||
err = os.Rename(tmpPath, p)
|
||||
|
@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
|
|||
}
|
||||
|
||||
func (w *genericWriter) removeFile(p string) error {
|
||||
var err error
|
||||
if w.fileCounterEnabled {
|
||||
w.fileGuard.Lock(p)
|
||||
err = os.Remove(p)
|
||||
w.fileGuard.Unlock(p)
|
||||
if err == nil {
|
||||
w.fileCounter.Dec()
|
||||
}
|
||||
} else {
|
||||
err = os.Remove(p)
|
||||
return w.removeFileWithCounter(p)
|
||||
}
|
||||
|
||||
err := os.Remove(p)
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *genericWriter) removeFileWithCounter(p string) error {
|
||||
w.fileGuard.Lock(p)
|
||||
defer w.fileGuard.Unlock(p)
|
||||
|
||||
s, err := os.Stat(p)
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.Remove(p)
|
||||
if err == nil {
|
||||
w.fileCounter.Dec(s.Size())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -18,7 +18,8 @@ type linuxWriter struct {
|
|||
perm uint32
|
||||
flags int
|
||||
|
||||
counter FileCounter
|
||||
counter FileCounter
|
||||
counterEnabled bool
|
||||
}
|
||||
|
||||
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
|
||||
|
@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
|
|||
}
|
||||
_ = unix.Close(fd) // Don't care about error.
|
||||
w := &linuxWriter{
|
||||
root: root,
|
||||
perm: uint32(perm),
|
||||
flags: flags,
|
||||
counter: c,
|
||||
root: root,
|
||||
perm: uint32(perm),
|
||||
flags: flags,
|
||||
counter: c,
|
||||
counterEnabled: counterEnabled(c),
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
|||
if n == len(data) {
|
||||
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
||||
if err == nil {
|
||||
w.counter.Inc()
|
||||
w.counter.Inc(int64(len(data)))
|
||||
}
|
||||
if errors.Is(err, unix.EEXIST) {
|
||||
err = nil
|
||||
|
@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
|||
}
|
||||
|
||||
func (w *linuxWriter) removeFile(p string) error {
|
||||
var s unix.Stat_t
|
||||
if w.counterEnabled {
|
||||
err := unix.Stat(p, &s)
|
||||
if err != nil && err == unix.ENOENT {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := unix.Unlink(p)
|
||||
if err != nil && err == unix.ENOENT {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if err == nil {
|
||||
w.counter.Dec()
|
||||
w.counter.Dec(s.Size)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ func New(opts ...Option) Cache {
|
|||
maxCacheSize: defaultMaxCacheSize,
|
||||
openFile: os.OpenFile,
|
||||
metrics: DefaultMetrics(),
|
||||
counter: fstree.NewSimpleCounter(),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -91,7 +92,7 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
|||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
_ = c.estimateCacheSize()
|
||||
c.estimateCacheSize()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"io/fs"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -26,8 +27,8 @@ type options struct {
|
|||
// maxCacheSize is the maximum total size of all objects saved in cache.
|
||||
// 1 GiB by default.
|
||||
maxCacheSize uint64
|
||||
// objCounters contains atomic counters for the number of objects stored in cache.
|
||||
objCounters counters
|
||||
// counter contains atomic counters for the number of objects stored in cache.
|
||||
counter *fstree.SimpleCounter
|
||||
// noSync is true iff FSTree allows unsynchronized writes.
|
||||
noSync bool
|
||||
// reportError is the function called when encountering disk errors in background workers.
|
||||
|
|
|
@ -60,8 +60,11 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||
func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
|
||||
addr := prm.Address.EncodeToString()
|
||||
cacheSz := c.estimateCacheSize()
|
||||
if c.maxCacheSize < c.incSizeFS(cacheSz) {
|
||||
estimatedObjSize := uint64(len(prm.RawData))
|
||||
if estimatedObjSize == 0 {
|
||||
estimatedObjSize = prm.Object.PayloadSize()
|
||||
}
|
||||
if !c.hasFreeSpace(estimatedObjSize) {
|
||||
return ErrOutOfSpace
|
||||
}
|
||||
|
||||
|
|
|
@ -1,48 +1,19 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
)
|
||||
|
||||
func (c *cache) estimateCacheSize() uint64 {
|
||||
fsCount := c.objCounters.FS()
|
||||
if fsCount > 0 {
|
||||
fsCount-- // db file
|
||||
func (c *cache) estimateCacheSize() {
|
||||
count, size := c.counter.Value()
|
||||
var ucount, usize uint64
|
||||
if count > 0 {
|
||||
ucount = uint64(count)
|
||||
}
|
||||
fsSize := fsCount * c.maxObjectSize
|
||||
c.metrics.SetEstimateSize(fsSize)
|
||||
c.metrics.SetActualCounters(fsCount)
|
||||
return fsSize
|
||||
if size > 0 {
|
||||
usize = uint64(size)
|
||||
}
|
||||
c.metrics.SetEstimateSize(ucount)
|
||||
c.metrics.SetActualCounters(usize)
|
||||
}
|
||||
|
||||
func (c *cache) incSizeFS(sz uint64) uint64 {
|
||||
return sz + c.maxObjectSize
|
||||
}
|
||||
|
||||
var _ fstree.FileCounter = &counters{}
|
||||
|
||||
type counters struct {
|
||||
cFS atomic.Uint64
|
||||
}
|
||||
|
||||
func (x *counters) FS() uint64 {
|
||||
return x.cFS.Load()
|
||||
}
|
||||
|
||||
// Set implements fstree.ObjectCounter.
|
||||
func (x *counters) Set(v uint64) {
|
||||
x.cFS.Store(v)
|
||||
}
|
||||
|
||||
// Inc implements fstree.ObjectCounter.
|
||||
func (x *counters) Inc() {
|
||||
x.cFS.Add(1)
|
||||
}
|
||||
|
||||
// Dec implements fstree.ObjectCounter.
|
||||
func (x *counters) Dec() {
|
||||
x.cFS.Add(math.MaxUint64)
|
||||
func (c *cache) hasFreeSpace(sz uint64) bool {
|
||||
_, size := c.counter.Value()
|
||||
return size+int64(sz) <= int64(c.maxCacheSize)
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
|||
fstree.WithDepth(1),
|
||||
fstree.WithDirNameLen(1),
|
||||
fstree.WithNoSync(c.noSync),
|
||||
fstree.WithFileCounter(&c.objCounters),
|
||||
fstree.WithFileCounter(c.counter),
|
||||
)
|
||||
if err := c.fsTree.Open(mod); err != nil {
|
||||
return fmt.Errorf("could not open FSTree: %w", err)
|
||||
|
|
Loading…
Reference in a new issue