From b142b6f48e46210235a56840c79da358120cf0cc Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 10 Sep 2024 11:01:30 +0300 Subject: [PATCH] [#1367] fstree: Add size to file counter FSTree file counter used by writecache. As writecache has now only one storage, so it is required to use real object size to get writecache size more accurate than `count * max_object_size`. Signed-off-by: Dmitrii Stepanov --- .../blobstor/fstree/counter.go | 61 +++++++++++++++---- .../blobstor/fstree/fstree.go | 22 ++++--- .../blobstor/fstree/fstree_test.go | 15 +++-- .../blobstor/fstree/fstree_write_generic.go | 27 +++++--- .../blobstor/fstree/fstree_write_linux.go | 42 ++++++++++--- pkg/local_object_storage/writecache/cache.go | 3 + .../writecache/options.go | 2 - pkg/local_object_storage/writecache/state.go | 41 ++----------- .../writecache/storage.go | 2 +- 9 files changed, 130 insertions(+), 85 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index 718104e2..b5dbc9e4 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,22 +1,21 @@ package fstree import ( - "math" - "sync/atomic" + "sync" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(v uint64) - Inc() - Dec() + Set(count, size uint64) + Inc(size uint64) + Dec(size uint64) } type noopCounter struct{} -func (c *noopCounter) Set(uint64) {} -func (c *noopCounter) Inc() {} -func (c *noopCounter) Dec() {} +func (c *noopCounter) Set(uint64, uint64) {} +func (c *noopCounter) Inc(uint64) {} +func (c *noopCounter) Dec(uint64) {} func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -24,14 +23,50 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - v atomic.Uint64 + mtx sync.RWMutex + count uint64 + size uint64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } -func (c *SimpleCounter) Inc() { c.v.Add(1) } -func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } -func (c *SimpleCounter) Value() uint64 { return c.v.Load() } +func (c *SimpleCounter) Set(count, size uint64) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.count = count + c.size = size +} + +func (c *SimpleCounter) Inc(size uint64) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.count++ + c.size += size +} + +func (c *SimpleCounter) Dec(size uint64) { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.count > 0 { + c.count-- + } else { + panic("fstree.SimpleCounter: invalid count") + } + if c.size >= size { + c.size -= size + } else { + panic("fstree.SimpleCounter: invalid size") + } +} + +func (c *SimpleCounter) CountSize() (uint64, uint64) { + c.mtx.RLock() + defer c.mtx.RUnlock() + + return c.count, c.size +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 02580dbf..bf6ba51e 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error { return nil } - counter, err := t.countFiles() + count, size, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(counter) + t.fileCounter.Set(count, size) return nil } -func (t *FSTree) countFiles() (uint64, error) { - var counter uint64 +func (t *FSTree) countFiles() (uint64, uint64, error) { + var count, size uint64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { - if !d.IsDir() { - counter++ + if d.IsDir() { + return nil } + count++ + info, err := d.Info() + if err != nil { + return err + } + size += uint64(info.Size()) return nil }, ) if err != nil { - return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return counter, nil + return count, size, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 5786dfd3..f39c7296 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - counterValue := counter.Value() - require.Equal(t, uint64(0), counterValue) + count, size := counter.CountSize() + require.Equal(t, uint64(0), count) + require.Equal(t, uint64(0), size) defer func() { require.NoError(t, fst.Close()) @@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() - var getPrm common.GetPrm - getPrm.Address = putPrm.Address - var delPrm common.DeletePrm delPrm.Address = addr @@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - counterValue = counter.Value() - realCount, err := fst.countFiles() + count, size = counter.CountSize() + realCount, realSize, err := fst.countFiles() require.NoError(t, err) - require.Equal(t, realCount, counterValue) + require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) + require.Equal(t, realSize, size, "real %d, actual %d", realSize, size) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 8b262288..801fc4a2 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc() + w.fileCounter.Inc(uint64(len(data))) var targetFileExists bool if _, e := os.Stat(p); e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec() + w.fileCounter.Dec(uint64(len(data))) } } else { err = os.Rename(tmpPath, p) @@ -110,12 +110,7 @@ func (w *genericWriter) writeFile(p string, data []byte) error { func (w *genericWriter) removeFile(p string) error { var err error if w.fileCounterEnabled { - w.fileGuard.Lock(p) - err = os.Remove(p) - w.fileGuard.Unlock(p) - if err == nil { - w.fileCounter.Dec() - } + err = w.removeWithCounter(p) } else { err = os.Remove(p) } @@ -125,3 +120,19 @@ func (w *genericWriter) removeFile(p string) error { } return err } + +func (w *genericWriter) removeWithCounter(p string) error { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + + stat, err := os.Stat(p) + if err != nil { + return err + } + + if err := os.Remove(p); err != nil { + return err + } + w.fileCounter.Dec(uint64(stat.Size())) + return nil +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index efc5a3d3..3127579a 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "golang.org/x/sys/unix" ) @@ -18,7 +19,9 @@ type linuxWriter struct { perm uint32 flags int - counter FileCounter + fileGuard keyLock + fileCounter FileCounter + fileCounterEnabled bool } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -33,11 +36,18 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b return nil } _ = unix.Close(fd) // Don't care about error. + var fileGuard keyLock = &noopKeyLock{} + fileCounterEnabled := counterEnabled(c) + if fileCounterEnabled { + fileGuard = utilSync.NewKeyLocker[string]() + } w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - counter: c, + root: root, + perm: uint32(perm), + flags: flags, + fileGuard: fileGuard, + fileCounter: c, + fileCounterEnabled: fileCounterEnabled, } return w } @@ -51,6 +61,10 @@ func (w *linuxWriter) writeData(p string, data []byte) error { } func (w *linuxWriter) writeFile(p string, data []byte) error { + if w.fileCounterEnabled { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + } fd, err := unix.Open(w.root, w.flags, w.perm) if err != nil { return err @@ -61,7 +75,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.counter.Inc() + w.fileCounter.Inc(uint64(len(data))) } if errors.Is(err, unix.EEXIST) { err = nil @@ -78,12 +92,24 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { - err := unix.Unlink(p) + if w.fileCounterEnabled { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + } + var stat unix.Stat_t + err := unix.Stat(p, &stat) + if err != nil { + if err == unix.ENOENT { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return err + } + err = unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.counter.Dec() + w.fileCounter.Dec(uint64(stat.Size)) } return err } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index ff38de40..f2280f2f 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -29,6 +29,8 @@ type cache struct { wg sync.WaitGroup // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree + // counter contains atomic counters for the number of objects stored in cache. + counter *fstree.SimpleCounter } // wcStorageType is used for write-cache operations logging. @@ -56,6 +58,7 @@ func New(opts ...Option) Cache { c := &cache{ flushCh: make(chan objectInfo), mode: mode.Disabled, + counter: fstree.NewSimpleCounter(), options: options{ log: &logger.Logger{Logger: zap.NewNop()}, diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 7845c5da..0643faac 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -30,8 +30,6 @@ type options struct { // maxCacheCount is the maximum total count of all object saved in cache. // 0 (no limit) by default. maxCacheCount uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters // maxBatchSize is the maximum batch size for the small object database. maxBatchSize int // maxBatchDelay is the maximum batch wait time for the small object database. diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index e4e22f40..748c78bc 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,18 +1,10 @@ package writecache -import ( - "math" - "sync/atomic" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" -) - func (c *cache) estimateCacheSize() (uint64, uint64) { - fsCount := c.objCounters.FS() - fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(0, fsSize) - c.metrics.SetActualCounters(0, fsCount) - return fsCount, fsSize + count, size := c.counter.CountSize() + c.metrics.SetEstimateSize(0, size) + c.metrics.SetActualCounters(0, count) + return count, size } func (c *cache) hasEnoughSpaceFS() bool { @@ -27,31 +19,6 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool { return c.maxCacheSize >= size+objectSize } -var _ fstree.FileCounter = &counters{} - -type counters struct { - cFS atomic.Uint64 -} - -func (x *counters) FS() uint64 { - return x.cFS.Load() -} - -// Set implements fstree.ObjectCounter. -func (x *counters) Set(v uint64) { - x.cFS.Store(v) -} - -// Inc implements fstree.ObjectCounter. -func (x *counters) Inc() { - x.cFS.Add(1) -} - -// Dec implements fstree.ObjectCounter. -func (x *counters) Dec() { - x.cFS.Add(math.MaxUint64) -} - func (c *cache) initCounters() error { c.estimateCacheSize() return nil diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 309bd2a6..e708a529 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -30,7 +30,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(&c.objCounters), + fstree.WithFileCounter(c.counter), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err)