diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index 718104e2..b5dbc9e4 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,22 +1,21 @@ package fstree import ( - "math" - "sync/atomic" + "sync" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(v uint64) - Inc() - Dec() + Set(count, size uint64) + Inc(size uint64) + Dec(size uint64) } type noopCounter struct{} -func (c *noopCounter) Set(uint64) {} -func (c *noopCounter) Inc() {} -func (c *noopCounter) Dec() {} +func (c *noopCounter) Set(uint64, uint64) {} +func (c *noopCounter) Inc(uint64) {} +func (c *noopCounter) Dec(uint64) {} func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -24,14 +23,50 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - v atomic.Uint64 + mtx sync.RWMutex + count uint64 + size uint64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } -func (c *SimpleCounter) Inc() { c.v.Add(1) } -func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } -func (c *SimpleCounter) Value() uint64 { return c.v.Load() } +func (c *SimpleCounter) Set(count, size uint64) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.count = count + c.size = size +} + +func (c *SimpleCounter) Inc(size uint64) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.count++ + c.size += size +} + +func (c *SimpleCounter) Dec(size uint64) { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.count > 0 { + c.count-- + } else { + panic("fstree.SimpleCounter: invalid count") + } + if c.size >= size { + c.size -= size + } else { + panic("fstree.SimpleCounter: invalid size") + } +} + +func (c *SimpleCounter) CountSize() (uint64, uint64) { + c.mtx.RLock() + defer c.mtx.RUnlock() + + return c.count, c.size +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 02580dbf..bf6ba51e 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error { return nil } - counter, err := t.countFiles() + count, size, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(counter) + t.fileCounter.Set(count, size) return nil } -func (t *FSTree) countFiles() (uint64, error) { - var counter uint64 +func (t *FSTree) countFiles() (uint64, uint64, error) { + var count, size uint64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { - if !d.IsDir() { - counter++ + if d.IsDir() { + return nil } + count++ + info, err := d.Info() + if err != nil { + return err + } + size += uint64(info.Size()) return nil }, ) if err != nil { - return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return counter, nil + return count, size, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 5786dfd3..f39c7296 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - counterValue := counter.Value() - require.Equal(t, uint64(0), counterValue) + count, size := counter.CountSize() + require.Equal(t, uint64(0), count) + require.Equal(t, uint64(0), size) defer func() { require.NoError(t, fst.Close()) @@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() - var getPrm common.GetPrm - getPrm.Address = putPrm.Address - var delPrm common.DeletePrm delPrm.Address = addr @@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - counterValue = counter.Value() - realCount, err := fst.countFiles() + count, size = counter.CountSize() + realCount, realSize, err := fst.countFiles() require.NoError(t, err) - require.Equal(t, realCount, counterValue) + require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) + require.Equal(t, realSize, size, "real %d, actual %d", realSize, size) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 8b262288..801fc4a2 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc() + w.fileCounter.Inc(uint64(len(data))) var targetFileExists bool if _, e := os.Stat(p); e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec() + w.fileCounter.Dec(uint64(len(data))) } } else { err = os.Rename(tmpPath, p) @@ -110,12 +110,7 @@ func (w *genericWriter) writeFile(p string, data []byte) error { func (w *genericWriter) removeFile(p string) error { var err error if w.fileCounterEnabled { - w.fileGuard.Lock(p) - err = os.Remove(p) - w.fileGuard.Unlock(p) - if err == nil { - w.fileCounter.Dec() - } + err = w.removeWithCounter(p) } else { err = os.Remove(p) } @@ -125,3 +120,19 @@ func (w *genericWriter) removeFile(p string) error { } return err } + +func (w *genericWriter) removeWithCounter(p string) error { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + + stat, err := os.Stat(p) + if err != nil { + return err + } + + if err := os.Remove(p); err != nil { + return err + } + w.fileCounter.Dec(uint64(stat.Size())) + return nil +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index efc5a3d3..3127579a 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "golang.org/x/sys/unix" ) @@ -18,7 +19,9 @@ type linuxWriter struct { perm uint32 flags int - counter FileCounter + fileGuard keyLock + fileCounter FileCounter + fileCounterEnabled bool } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -33,11 +36,18 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b return nil } _ = unix.Close(fd) // Don't care about error. + var fileGuard keyLock = &noopKeyLock{} + fileCounterEnabled := counterEnabled(c) + if fileCounterEnabled { + fileGuard = utilSync.NewKeyLocker[string]() + } w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - counter: c, + root: root, + perm: uint32(perm), + flags: flags, + fileGuard: fileGuard, + fileCounter: c, + fileCounterEnabled: fileCounterEnabled, } return w } @@ -51,6 +61,10 @@ func (w *linuxWriter) writeData(p string, data []byte) error { } func (w *linuxWriter) writeFile(p string, data []byte) error { + if w.fileCounterEnabled { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + } fd, err := unix.Open(w.root, w.flags, w.perm) if err != nil { return err @@ -61,7 +75,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.counter.Inc() + w.fileCounter.Inc(uint64(len(data))) } if errors.Is(err, unix.EEXIST) { err = nil @@ -78,12 +92,24 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { - err := unix.Unlink(p) + if w.fileCounterEnabled { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + } + var stat unix.Stat_t + err := unix.Stat(p, &stat) + if err != nil { + if err == unix.ENOENT { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return err + } + err = unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.counter.Dec() + w.fileCounter.Dec(uint64(stat.Size)) } return err } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index ff38de40..f2280f2f 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -29,6 +29,8 @@ type cache struct { wg sync.WaitGroup // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree + // counter contains atomic counters for the number of objects stored in cache. + counter *fstree.SimpleCounter } // wcStorageType is used for write-cache operations logging. @@ -56,6 +58,7 @@ func New(opts ...Option) Cache { c := &cache{ flushCh: make(chan objectInfo), mode: mode.Disabled, + counter: fstree.NewSimpleCounter(), options: options{ log: &logger.Logger{Logger: zap.NewNop()}, diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 7845c5da..0643faac 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -30,8 +30,6 @@ type options struct { // maxCacheCount is the maximum total count of all object saved in cache. // 0 (no limit) by default. maxCacheCount uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters // maxBatchSize is the maximum batch size for the small object database. maxBatchSize int // maxBatchDelay is the maximum batch wait time for the small object database. diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index e4e22f40..748c78bc 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,18 +1,10 @@ package writecache -import ( - "math" - "sync/atomic" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" -) - func (c *cache) estimateCacheSize() (uint64, uint64) { - fsCount := c.objCounters.FS() - fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(0, fsSize) - c.metrics.SetActualCounters(0, fsCount) - return fsCount, fsSize + count, size := c.counter.CountSize() + c.metrics.SetEstimateSize(0, size) + c.metrics.SetActualCounters(0, count) + return count, size } func (c *cache) hasEnoughSpaceFS() bool { @@ -27,31 +19,6 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool { return c.maxCacheSize >= size+objectSize } -var _ fstree.FileCounter = &counters{} - -type counters struct { - cFS atomic.Uint64 -} - -func (x *counters) FS() uint64 { - return x.cFS.Load() -} - -// Set implements fstree.ObjectCounter. -func (x *counters) Set(v uint64) { - x.cFS.Store(v) -} - -// Inc implements fstree.ObjectCounter. -func (x *counters) Inc() { - x.cFS.Add(1) -} - -// Dec implements fstree.ObjectCounter. -func (x *counters) Dec() { - x.cFS.Add(math.MaxUint64) -} - func (c *cache) initCounters() error { c.estimateCacheSize() return nil diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 309bd2a6..e708a529 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -30,7 +30,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(&c.objCounters), + fstree.WithFileCounter(c.counter), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err)