From 6e92cf68e34edecefb8e3638db1b7644b6696a52 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 23 Jul 2024 11:29:05 +0300 Subject: [PATCH] [#9999] writecache: Count real data size The size of the data on the disk is used to determine the writeache size, but not the number of objects multiplied by the maximum allowed object size. Signed-off-by: Dmitrii Stepanov --- .../blobstor/fstree/counter.go | 40 ++++++++++---- .../blobstor/fstree/fstree.go | 20 ++++--- .../blobstor/fstree/fstree_test.go | 13 ++--- .../blobstor/fstree/fstree_write_generic.go | 36 ++++++++---- .../blobstor/fstree/fstree_write_linux.go | 26 ++++++--- pkg/local_object_storage/writecache/cache.go | 3 +- .../writecache/options.go | 5 +- pkg/local_object_storage/writecache/put.go | 7 ++- pkg/local_object_storage/writecache/state.go | 55 +++++-------------- .../writecache/storage.go | 2 +- 10 files changed, 114 insertions(+), 93 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index 718104e2e..c1c6ff6fa 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,22 +1,23 @@ package fstree import ( - "math" "sync/atomic" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(v uint64) - Inc() - Dec() + Set(count, size int64) + Inc(size int64) + Dec(size int64) + Value() (int64, int64) } type noopCounter struct{} -func (c *noopCounter) Set(uint64) {} -func (c *noopCounter) Inc() {} -func (c *noopCounter) Dec() {} +func (c *noopCounter) Set(int64, int64) {} +func (c *noopCounter) Inc(int64) {} +func (c *noopCounter) Dec(int64) {} +func (c *noopCounter) Value() (int64, int64) { return 0, 0 } func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - v atomic.Uint64 + count atomic.Int64 + size atomic.Int64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } -func (c *SimpleCounter) Inc() { c.v.Add(1) } -func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } -func (c *SimpleCounter) Value() uint64 { return c.v.Load() } +func (c *SimpleCounter) Set(count, size int64) { + c.count.Store(count) + c.size.Store(size) +} + +func (c *SimpleCounter) Inc(size int64) { + c.count.Add(1) + c.size.Add(size) +} + +func (c *SimpleCounter) Dec(size int64) { + c.count.Add(-1) + c.size.Add(-size) +} + +func (c *SimpleCounter) Value() (int64, int64) { + return c.count.Load(), c.size.Load() +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 02580dbfa..cf0affdb6 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error { return nil } - counter, err := t.countFiles() + count, size, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(counter) + t.fileCounter.Set(count, size) return nil } -func (t *FSTree) countFiles() (uint64, error) { - var counter uint64 +func (t *FSTree) countFiles() (int64, int64, error) { + var count int64 + var size int64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { if !d.IsDir() { - counter++ + count++ + fi, err := d.Info() + if err != nil { + return err + } + size += fi.Size() } return nil }, ) if err != nil { - return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return counter, nil + return count, size, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index d633cbac3..f89df7405 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - counterValue := counter.Value() - require.Equal(t, uint64(0), counterValue) + counterValue, sizeValue := counter.Value() + require.Equal(t, int64(0), counterValue) + require.Equal(t, int64(0), sizeValue) defer func() { require.NoError(t, fst.Close()) @@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() - var getPrm common.GetPrm - getPrm.Address = putPrm.Address - var delPrm common.DeletePrm delPrm.Address = addr @@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - counterValue = counter.Value() - realCount, err := fst.countFiles() + counterValue, sizeValue = counter.Value() + realCount, realSize, err := fst.countFiles() require.NoError(t, err) require.Equal(t, realCount, counterValue) + require.Equal(t, realSize, sizeValue) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 8b2622885..cf9941ec8 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc() + w.fileCounter.Inc(int64(len(data))) var targetFileExists bool - if _, e := os.Stat(p); e == nil { + s, e := os.Stat(p) + if e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec() + w.fileCounter.Dec(int64(s.Size())) } } else { err = os.Rename(tmpPath, p) @@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error { } func (w *genericWriter) removeFile(p string) error { - var err error if w.fileCounterEnabled { - w.fileGuard.Lock(p) - err = os.Remove(p) - w.fileGuard.Unlock(p) - if err == nil { - w.fileCounter.Dec() - } - } else { - err = os.Remove(p) + return w.removeFileWithCounter(p) } + err := os.Remove(p) if err != nil && os.IsNotExist(err) { err = logicerr.Wrap(new(apistatus.ObjectNotFound)) } return err } + +func (w *genericWriter) removeFileWithCounter(p string) error { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + + s, err := os.Stat(p) + if err != nil && os.IsNotExist(err) { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + err = os.Remove(p) + if err == nil { + w.fileCounter.Dec(s.Size()) + } + return err +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index efc5a3d3d..774e996f9 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -18,7 +18,8 @@ type linuxWriter struct { perm uint32 flags int - counter FileCounter + counter FileCounter + counterEnabled bool } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b } _ = unix.Close(fd) // Don't care about error. w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - counter: c, + root: root, + perm: uint32(perm), + flags: flags, + counter: c, + counterEnabled: counterEnabled(c), } return w } @@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.counter.Inc() + w.counter.Inc(int64(len(data))) } if errors.Is(err, unix.EEXIST) { err = nil @@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { + var s unix.Stat_t + if w.counterEnabled { + err := unix.Stat(p, &s) + if err != nil && err == unix.ENOENT { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + } err := unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.counter.Dec() + w.counter.Dec(s.Size) } return err } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index 3faef8838..e57f4fc5e 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -57,6 +57,7 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, openFile: os.OpenFile, metrics: DefaultMetrics(), + counter: fstree.NewSimpleCounter(), }, } @@ -91,7 +92,7 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { return metaerr.Wrap(err) } - _ = c.estimateCacheSize() + c.estimateCacheSize() return nil } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 9620aab94..ada098fd4 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -4,6 +4,7 @@ import ( "io/fs" "os" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -26,8 +27,8 @@ type options struct { // maxCacheSize is the maximum total size of all objects saved in cache. // 1 GiB by default. maxCacheSize uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters + // counter contains atomic counters for the number of objects stored in cache. + counter *fstree.SimpleCounter // noSync is true iff FSTree allows unsynchronized writes. noSync bool // reportError is the function called when encountering disk errors in background workers. diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 6641d3b70..9f60972d3 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -60,8 +60,11 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro // putBig writes object to FSTree and pushes it to the flush workers queue. func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { addr := prm.Address.EncodeToString() - cacheSz := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeFS(cacheSz) { + estimatedObjSize := uint64(len(prm.RawData)) + if estimatedObjSize == 0 { + estimatedObjSize = prm.Object.PayloadSize() + } + if !c.hasFreeSpace(estimatedObjSize) { return ErrOutOfSpace } diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index c2198cc09..6d02df245 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,48 +1,19 @@ package writecache -import ( - "math" - "sync/atomic" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" -) - -func (c *cache) estimateCacheSize() uint64 { - fsCount := c.objCounters.FS() - if fsCount > 0 { - fsCount-- // db file +func (c *cache) estimateCacheSize() { + count, size := c.counter.Value() + var ucount, usize uint64 + if count > 0 { + ucount = uint64(count) } - fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(fsSize) - c.metrics.SetActualCounters(fsCount) - return fsSize + if size > 0 { + usize = uint64(size) + } + c.metrics.SetEstimateSize(ucount) + c.metrics.SetActualCounters(usize) } -func (c *cache) incSizeFS(sz uint64) uint64 { - return sz + c.maxObjectSize -} - -var _ fstree.FileCounter = &counters{} - -type counters struct { - cFS atomic.Uint64 -} - -func (x *counters) FS() uint64 { - return x.cFS.Load() -} - -// Set implements fstree.ObjectCounter. -func (x *counters) Set(v uint64) { - x.cFS.Store(v) -} - -// Inc implements fstree.ObjectCounter. -func (x *counters) Inc() { - x.cFS.Add(1) -} - -// Dec implements fstree.ObjectCounter. -func (x *counters) Dec() { - x.cFS.Add(math.MaxUint64) +func (c *cache) hasFreeSpace(sz uint64) bool { + _, size := c.counter.Value() + return size+int64(sz) <= int64(c.maxCacheSize) } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 2dc922032..6aface7a5 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -28,7 +28,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(&c.objCounters), + fstree.WithFileCounter(c.counter), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err)