diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index 718104e2e..c1c6ff6fa 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,22 +1,23 @@ package fstree import ( - "math" "sync/atomic" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(v uint64) - Inc() - Dec() + Set(count, size int64) + Inc(size int64) + Dec(size int64) + Value() (int64, int64) } type noopCounter struct{} -func (c *noopCounter) Set(uint64) {} -func (c *noopCounter) Inc() {} -func (c *noopCounter) Dec() {} +func (c *noopCounter) Set(int64, int64) {} +func (c *noopCounter) Inc(int64) {} +func (c *noopCounter) Dec(int64) {} +func (c *noopCounter) Value() (int64, int64) { return 0, 0 } func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - v atomic.Uint64 + count atomic.Int64 + size atomic.Int64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } -func (c *SimpleCounter) Inc() { c.v.Add(1) } -func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } -func (c *SimpleCounter) Value() uint64 { return c.v.Load() } +func (c *SimpleCounter) Set(count, size int64) { + c.count.Store(count) + c.size.Store(size) +} + +func (c *SimpleCounter) Inc(size int64) { + c.count.Add(1) + c.size.Add(size) +} + +func (c *SimpleCounter) Dec(size int64) { + c.count.Add(-1) + c.size.Add(-size) +} + +func (c *SimpleCounter) Value() (int64, int64) { + return c.count.Load(), c.size.Load() +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 02580dbfa..cf0affdb6 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error { return nil } - counter, err := t.countFiles() + count, size, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(counter) + t.fileCounter.Set(count, size) return nil } -func (t *FSTree) countFiles() (uint64, error) { - var counter uint64 +func (t *FSTree) countFiles() (int64, int64, error) { + var count int64 + var size int64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { if !d.IsDir() { - counter++ + count++ + fi, err := d.Info() + if err != nil { + return err + } + size += fi.Size() } return nil }, ) if err != nil { - return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return counter, nil + return count, size, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index d633cbac3..f89df7405 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - counterValue := counter.Value() - require.Equal(t, uint64(0), counterValue) + counterValue, sizeValue := counter.Value() + require.Equal(t, int64(0), counterValue) + require.Equal(t, int64(0), sizeValue) defer func() { require.NoError(t, fst.Close()) @@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() - var getPrm common.GetPrm - getPrm.Address = putPrm.Address - var delPrm common.DeletePrm delPrm.Address = addr @@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, eg.Wait()) - counterValue = counter.Value() - realCount, err := fst.countFiles() + counterValue, sizeValue = counter.Value() + realCount, realSize, err := fst.countFiles() require.NoError(t, err) require.Equal(t, realCount, counterValue) + require.Equal(t, realSize, sizeValue) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 8b2622885..cf9941ec8 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc() + w.fileCounter.Inc(int64(len(data))) var targetFileExists bool - if _, e := os.Stat(p); e == nil { + s, e := os.Stat(p) + if e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec() + w.fileCounter.Dec(int64(s.Size())) } } else { err = os.Rename(tmpPath, p) @@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error { } func (w *genericWriter) removeFile(p string) error { - var err error if w.fileCounterEnabled { - w.fileGuard.Lock(p) - err = os.Remove(p) - w.fileGuard.Unlock(p) - if err == nil { - w.fileCounter.Dec() - } - } else { - err = os.Remove(p) + return w.removeFileWithCounter(p) } + err := os.Remove(p) if err != nil && os.IsNotExist(err) { err = logicerr.Wrap(new(apistatus.ObjectNotFound)) } return err } + +func (w *genericWriter) removeFileWithCounter(p string) error { + w.fileGuard.Lock(p) + defer w.fileGuard.Unlock(p) + + s, err := os.Stat(p) + if err != nil && os.IsNotExist(err) { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + err = os.Remove(p) + if err == nil { + w.fileCounter.Dec(s.Size()) + } + return err +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index efc5a3d3d..774e996f9 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -18,7 +18,8 @@ type linuxWriter struct { perm uint32 flags int - counter FileCounter + counter FileCounter + counterEnabled bool } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -34,10 +35,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b } _ = unix.Close(fd) // Don't care about error. w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - counter: c, + root: root, + perm: uint32(perm), + flags: flags, + counter: c, + counterEnabled: counterEnabled(c), } return w } @@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.counter.Inc() + w.counter.Inc(int64(len(data))) } if errors.Is(err, unix.EEXIST) { err = nil @@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { } func (w *linuxWriter) removeFile(p string) error { + var s unix.Stat_t + if w.counterEnabled { + err := unix.Stat(p, &s) + if err != nil && err == unix.ENOENT { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if err != nil { + return err + } + } err := unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.counter.Dec() + w.counter.Dec(s.Size) } return err } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index 3faef8838..e57f4fc5e 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -57,6 +57,7 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, openFile: os.OpenFile, metrics: DefaultMetrics(), + counter: fstree.NewSimpleCounter(), }, } @@ -91,7 +92,7 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { return metaerr.Wrap(err) } - _ = c.estimateCacheSize() + c.estimateCacheSize() return nil } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 9620aab94..ada098fd4 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -4,6 +4,7 @@ import ( "io/fs" "os" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -26,8 +27,8 @@ type options struct { // maxCacheSize is the maximum total size of all objects saved in cache. // 1 GiB by default. maxCacheSize uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters + // counter contains atomic counters for the number of objects stored in cache. + counter *fstree.SimpleCounter // noSync is true iff FSTree allows unsynchronized writes. noSync bool // reportError is the function called when encountering disk errors in background workers. diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 6641d3b70..9f60972d3 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -60,8 +60,11 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro // putBig writes object to FSTree and pushes it to the flush workers queue. func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { addr := prm.Address.EncodeToString() - cacheSz := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeFS(cacheSz) { + estimatedObjSize := uint64(len(prm.RawData)) + if estimatedObjSize == 0 { + estimatedObjSize = prm.Object.PayloadSize() + } + if !c.hasFreeSpace(estimatedObjSize) { return ErrOutOfSpace } diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index c2198cc09..6d02df245 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,48 +1,19 @@ package writecache -import ( - "math" - "sync/atomic" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" -) - -func (c *cache) estimateCacheSize() uint64 { - fsCount := c.objCounters.FS() - if fsCount > 0 { - fsCount-- // db file +func (c *cache) estimateCacheSize() { + count, size := c.counter.Value() + var ucount, usize uint64 + if count > 0 { + ucount = uint64(count) } - fsSize := fsCount * c.maxObjectSize - c.metrics.SetEstimateSize(fsSize) - c.metrics.SetActualCounters(fsCount) - return fsSize + if size > 0 { + usize = uint64(size) + } + c.metrics.SetEstimateSize(ucount) + c.metrics.SetActualCounters(usize) } -func (c *cache) incSizeFS(sz uint64) uint64 { - return sz + c.maxObjectSize -} - -var _ fstree.FileCounter = &counters{} - -type counters struct { - cFS atomic.Uint64 -} - -func (x *counters) FS() uint64 { - return x.cFS.Load() -} - -// Set implements fstree.ObjectCounter. -func (x *counters) Set(v uint64) { - x.cFS.Store(v) -} - -// Inc implements fstree.ObjectCounter. -func (x *counters) Inc() { - x.cFS.Add(1) -} - -// Dec implements fstree.ObjectCounter. -func (x *counters) Dec() { - x.cFS.Add(math.MaxUint64) +func (c *cache) hasFreeSpace(sz uint64) bool { + _, size := c.counter.Value() + return size+int64(sz) <= int64(c.maxCacheSize) } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 2dc922032..6aface7a5 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -28,7 +28,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(&c.objCounters), + fstree.WithFileCounter(c.counter), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err)