diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index b5dbc9e40..718104e2e 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,21 +1,22 @@ package fstree import ( - "sync" + "math" + "sync/atomic" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(count, size uint64) - Inc(size uint64) - Dec(size uint64) + Set(v uint64) + Inc() + Dec() } type noopCounter struct{} -func (c *noopCounter) Set(uint64, uint64) {} -func (c *noopCounter) Inc(uint64) {} -func (c *noopCounter) Dec(uint64) {} +func (c *noopCounter) Set(uint64) {} +func (c *noopCounter) Inc() {} +func (c *noopCounter) Dec() {} func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -23,50 +24,14 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - mtx sync.RWMutex - count uint64 - size uint64 + v atomic.Uint64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(count, size uint64) { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.count = count - c.size = size -} - -func (c *SimpleCounter) Inc(size uint64) { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.count++ - c.size += size -} - -func (c *SimpleCounter) Dec(size uint64) { - c.mtx.Lock() - defer c.mtx.Unlock() - - if c.count > 0 { - c.count-- - } else { - panic("fstree.SimpleCounter: invalid count") - } - if c.size >= size { - c.size -= size - } else { - panic("fstree.SimpleCounter: invalid size") - } -} - -func (c *SimpleCounter) CountSize() (uint64, uint64) { - c.mtx.RLock() - defer c.mtx.RUnlock() - - return c.count, c.size -} +func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } +func (c *SimpleCounter) Inc() { c.v.Add(1) } +func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } +func (c *SimpleCounter) Value() uint64 { return c.v.Load() } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index d480136a7..13e7eb7b4 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,38 +435,32 @@ func (t *FSTree) initFileCounter() error { return nil } - count, size, err := t.countFiles() + counter, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(count, size) + t.fileCounter.Set(counter) return nil } -func (t *FSTree) countFiles() (uint64, uint64, error) { - var count, size uint64 +func (t *FSTree) countFiles() (uint64, error) { + var counter uint64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { - if d.IsDir() { - return nil + if !d.IsDir() { + counter++ } - count++ - info, err := d.Info() - if err != nil { - return err - } - size += uint64(info.Size()) return nil }, ) if err != nil { - return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return count, size, nil + return counter, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 8df61390f..4a434e52c 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,9 +47,8 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - count, size := counter.CountSize() - require.Equal(t, uint64(0), count) - require.Equal(t, uint64(0), size) + counterValue := counter.Value() + require.Equal(t, uint64(0), counterValue) defer func() { require.NoError(t, fst.Close(context.Background())) @@ -65,6 +64,9 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() + var getPrm common.GetPrm + getPrm.Address = putPrm.Address + var delPrm common.DeletePrm delPrm.Address = addr @@ -93,9 +95,8 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - count, size = counter.CountSize() - realCount, realSize, err := fst.countFiles() + counterValue = counter.Value() + realCount, err := fst.countFiles() require.NoError(t, err) - require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) - require.Equal(t, realSize, size, "real %d, actual %d", realSize, size) + require.Equal(t, realCount, counterValue) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 801fc4a22..8b2622885 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc(uint64(len(data))) + w.fileCounter.Inc() var targetFileExists bool if _, e := os.Stat(p); e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec(uint64(len(data))) + w.fileCounter.Dec() } } else { err = os.Rename(tmpPath, p) @@ -110,7 +110,12 @@ func (w *genericWriter) writeFile(p string, data []byte) error { func (w *genericWriter) removeFile(p string) error { var err error if w.fileCounterEnabled { - err = w.removeWithCounter(p) + w.fileGuard.Lock(p) + err = os.Remove(p) + w.fileGuard.Unlock(p) + if err == nil { + w.fileCounter.Dec() + } } else { err = os.Remove(p) } @@ -120,19 +125,3 @@ func (w *genericWriter) removeFile(p string) error { } return err } - -func (w *genericWriter) removeWithCounter(p string) error { - w.fileGuard.Lock(p) - defer w.fileGuard.Unlock(p) - - stat, err := os.Stat(p) - if err != nil { - return err - } - - if err := os.Remove(p); err != nil { - return err - } - w.fileCounter.Dec(uint64(stat.Size())) - return nil -} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index 3127579ac..efc5a3d3d 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -9,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "golang.org/x/sys/unix" ) @@ -19,9 +18,7 @@ type linuxWriter struct { perm uint32 flags int - fileGuard keyLock - fileCounter FileCounter - fileCounterEnabled bool + counter FileCounter } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -36,18 +33,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b return nil } _ = unix.Close(fd) // Don't care about error. - var fileGuard keyLock = &noopKeyLock{} - fileCounterEnabled := counterEnabled(c) - if fileCounterEnabled { - fileGuard = utilSync.NewKeyLocker[string]() - } w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - fileGuard: fileGuard, - fileCounter: c, - fileCounterEnabled: fileCounterEnabled, + root: root, + perm: uint32(perm), + flags: flags, + counter: c, } return w } @@ -61,10 +51,6 @@ func (w *linuxWriter) writeData(p string, data []byte) error { } func (w *linuxWriter) writeFile(p string, data []byte) error { - if w.fileCounterEnabled { - w.fileGuard.Lock(p) - defer w.fileGuard.Unlock(p) - } fd, err := unix.Open(w.root, w.flags, w.perm) if err != nil { return err @@ -75,7 +61,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.fileCounter.Inc(uint64(len(data))) + w.counter.Inc() } if errors.Is(err, unix.EEXIST) { err = nil @@ -92,24 +78,12 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { - if w.fileCounterEnabled { - w.fileGuard.Lock(p) - defer w.fileGuard.Unlock(p) - } - var stat unix.Stat_t - err := unix.Stat(p, &stat) - if err != nil { - if err == unix.ENOENT { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - return err - } - err = unix.Unlink(p) + err := unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.fileCounter.Dec(uint64(stat.Size)) + w.counter.Dec() } return err } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index 1e11d2ec3..796fe155b 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -29,8 +29,6 @@ type cache struct { wg sync.WaitGroup // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree - // counter contains atomic counters for the number of objects stored in cache. - counter *fstree.SimpleCounter } // wcStorageType is used for write-cache operations logging. @@ -58,7 +56,6 @@ func New(opts ...Option) Cache { c := &cache{ flushCh: make(chan objectInfo), mode: mode.Disabled, - counter: fstree.NewSimpleCounter(), options: options{ log: logger.NewLoggerWrapper(zap.NewNop()), diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 9edd569fb..280359d00 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -31,6 +31,8 @@ type options struct { // maxCacheCount is the maximum total count of all object saved in cache. // 0 (no limit) by default. maxCacheCount uint64 + // objCounters contains atomic counters for the number of objects stored in cache. + objCounters counters // maxBatchSize is the maximum batch size for the small object database. maxBatchSize int // maxBatchDelay is the maximum batch wait time for the small object database. diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 748c78bcb..e4e22f404 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,10 +1,18 @@ package writecache +import ( + "math" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" +) + func (c *cache) estimateCacheSize() (uint64, uint64) { - count, size := c.counter.CountSize() - c.metrics.SetEstimateSize(0, size) - c.metrics.SetActualCounters(0, count) - return count, size + fsCount := c.objCounters.FS() + fsSize := fsCount * c.maxObjectSize + c.metrics.SetEstimateSize(0, fsSize) + c.metrics.SetActualCounters(0, fsCount) + return fsCount, fsSize } func (c *cache) hasEnoughSpaceFS() bool { @@ -19,6 +27,31 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool { return c.maxCacheSize >= size+objectSize } +var _ fstree.FileCounter = &counters{} + +type counters struct { + cFS atomic.Uint64 +} + +func (x *counters) FS() uint64 { + return x.cFS.Load() +} + +// Set implements fstree.ObjectCounter. +func (x *counters) Set(v uint64) { + x.cFS.Store(v) +} + +// Inc implements fstree.ObjectCounter. +func (x *counters) Inc() { + x.cFS.Add(1) +} + +// Dec implements fstree.ObjectCounter. +func (x *counters) Dec() { + x.cFS.Add(math.MaxUint64) +} + func (c *cache) initCounters() error { c.estimateCacheSize() return nil diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 74675a91e..8631437f4 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -30,7 +30,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(c.counter), + fstree.WithFileCounter(&c.objCounters), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err)