WIP: FSTree only writecache #1273

Closed
dstepanov-yadro wants to merge 4 commits from dstepanov-yadro/frostfs-node:feat/fstree_only_writecache into master
10 changed files with 114 additions and 93 deletions
Showing only changes of commit a7536afbf5 - Show all commits

View file

@ -1,22 +1,23 @@
package fstree package fstree
import ( import (
"math"
"sync/atomic" "sync/atomic"
) )
// FileCounter used to count files in FSTree. The implementation must be thread-safe. // FileCounter used to count files in FSTree. The implementation must be thread-safe.
type FileCounter interface { type FileCounter interface {
Set(v uint64) Set(count, size int64)
Inc() Inc(size int64)
Dec() Dec(size int64)
Value() (int64, int64)
} }
type noopCounter struct{} type noopCounter struct{}
func (c *noopCounter) Set(uint64) {} func (c *noopCounter) Set(int64, int64) {}
func (c *noopCounter) Inc() {} func (c *noopCounter) Inc(int64) {}
func (c *noopCounter) Dec() {} func (c *noopCounter) Dec(int64) {}
func (c *noopCounter) Value() (int64, int64) { return 0, 0 }
func counterEnabled(c FileCounter) bool { func counterEnabled(c FileCounter) bool {
_, noop := c.(*noopCounter) _, noop := c.(*noopCounter)
@ -24,14 +25,29 @@ func counterEnabled(c FileCounter) bool {
} }
type SimpleCounter struct { type SimpleCounter struct {
v atomic.Uint64 count atomic.Int64
size atomic.Int64
} }
func NewSimpleCounter() *SimpleCounter { func NewSimpleCounter() *SimpleCounter {
return &SimpleCounter{} return &SimpleCounter{}
} }
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } func (c *SimpleCounter) Set(count, size int64) {
func (c *SimpleCounter) Inc() { c.v.Add(1) } c.count.Store(count)
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } c.size.Store(size)
func (c *SimpleCounter) Value() uint64 { return c.v.Load() } }
func (c *SimpleCounter) Inc(size int64) {
c.count.Add(1)
c.size.Add(size)
}
func (c *SimpleCounter) Dec(size int64) {
c.count.Add(-1)
c.size.Add(-size)
}
func (c *SimpleCounter) Value() (int64, int64) {
return c.count.Load(), c.size.Load()
}

View file

@ -435,32 +435,38 @@ func (t *FSTree) initFileCounter() error {
return nil return nil
} }
counter, err := t.countFiles() count, size, err := t.countFiles()
if err != nil { if err != nil {
return err return err
} }
t.fileCounter.Set(counter) t.fileCounter.Set(count, size)
return nil return nil
} }
func (t *FSTree) countFiles() (uint64, error) { func (t *FSTree) countFiles() (int64, int64, error) {
var counter uint64 var count int64
var size int64
// it is simpler to just consider every file // it is simpler to just consider every file
// that is not directory as an object // that is not directory as an object
err := filepath.WalkDir(t.RootPath, err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error { func(_ string, d fs.DirEntry, _ error) error {
if !d.IsDir() { if !d.IsDir() {
counter++ count++
fi, err := d.Info()
if err != nil {
return err
}
size += fi.Size()
} }
return nil return nil
}, },
) )
if err != nil { if err != nil {
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
} }
return counter, nil return count, size, nil
} }
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {

View file

@ -47,8 +47,9 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Open(mode.ComponentReadWrite))
require.NoError(t, fst.Init()) require.NoError(t, fst.Init())
counterValue := counter.Value() counterValue, sizeValue := counter.Value()
require.Equal(t, uint64(0), counterValue) require.Equal(t, int64(0), counterValue)
require.Equal(t, int64(0), sizeValue)
defer func() { defer func() {
require.NoError(t, fst.Close()) require.NoError(t, fst.Close())
@ -64,9 +65,6 @@ func TestObjectCounter(t *testing.T) {
putPrm.Address = addr putPrm.Address = addr
putPrm.RawData, _ = obj.Marshal() putPrm.RawData, _ = obj.Marshal()
var getPrm common.GetPrm
getPrm.Address = putPrm.Address
var delPrm common.DeletePrm var delPrm common.DeletePrm
delPrm.Address = addr delPrm.Address = addr
@ -95,8 +93,9 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
counterValue = counter.Value() counterValue, sizeValue = counter.Value()
realCount, err := fst.countFiles() realCount, realSize, err := fst.countFiles()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, realCount, counterValue) require.Equal(t, realCount, counterValue)
require.Equal(t, realSize, sizeValue)
} }

View file

@ -78,14 +78,15 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
} }
if w.fileCounterEnabled { if w.fileCounterEnabled {
w.fileCounter.Inc() w.fileCounter.Inc(int64(len(data)))
var targetFileExists bool var targetFileExists bool
if _, e := os.Stat(p); e == nil { s, e := os.Stat(p)
if e == nil {
targetFileExists = true targetFileExists = true
} }
err = os.Rename(tmpPath, p) err = os.Rename(tmpPath, p)
if err == nil && targetFileExists { if err == nil && targetFileExists {
w.fileCounter.Dec() w.fileCounter.Dec(int64(s.Size()))
} }
} else { } else {
err = os.Rename(tmpPath, p) err = os.Rename(tmpPath, p)
@ -108,20 +109,31 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
} }
func (w *genericWriter) removeFile(p string) error { func (w *genericWriter) removeFile(p string) error {
var err error
if w.fileCounterEnabled { if w.fileCounterEnabled {
w.fileGuard.Lock(p) return w.removeFileWithCounter(p)
err = os.Remove(p)
w.fileGuard.Unlock(p)
if err == nil {
w.fileCounter.Dec()
}
} else {
err = os.Remove(p)
} }
err := os.Remove(p)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
err = logicerr.Wrap(new(apistatus.ObjectNotFound)) err = logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
return err return err
} }
func (w *genericWriter) removeFileWithCounter(p string) error {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
s, err := os.Stat(p)
if err != nil && os.IsNotExist(err) {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err != nil {
return err
}
err = os.Remove(p)
if err == nil {
w.fileCounter.Dec(s.Size())
}
return err
}

View file

@ -19,6 +19,7 @@ type linuxWriter struct {
flags int flags int
counter FileCounter counter FileCounter
counterEnabled bool
} }
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
@ -38,6 +39,7 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
perm: uint32(perm), perm: uint32(perm),
flags: flags, flags: flags,
counter: c, counter: c,
counterEnabled: counterEnabled(c),
} }
return w return w
} }
@ -61,7 +63,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if n == len(data) { if n == len(data) {
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
if err == nil { if err == nil {
w.counter.Inc() w.counter.Inc(int64(len(data)))
} }
if errors.Is(err, unix.EEXIST) { if errors.Is(err, unix.EEXIST) {
err = nil err = nil
@ -78,12 +80,22 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
} }
func (w *linuxWriter) removeFile(p string) error { func (w *linuxWriter) removeFile(p string) error {
var s unix.Stat_t
if w.counterEnabled {
err := unix.Stat(p, &s)
if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err != nil {
return err
}
}
err := unix.Unlink(p) err := unix.Unlink(p)
if err != nil && err == unix.ENOENT { if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
if err == nil { if err == nil {
w.counter.Dec() w.counter.Dec(s.Size)
} }
return err return err
} }

View file

@ -57,6 +57,7 @@ func New(opts ...Option) Cache {
maxCacheSize: defaultMaxCacheSize, maxCacheSize: defaultMaxCacheSize,
openFile: os.OpenFile, openFile: os.OpenFile,
metrics: DefaultMetrics(), metrics: DefaultMetrics(),
counter: fstree.NewSimpleCounter(),
}, },
} }
@ -91,7 +92,7 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
_ = c.estimateCacheSize() c.estimateCacheSize()
return nil return nil
} }

View file

@ -4,6 +4,7 @@ import (
"io/fs" "io/fs"
"os" "os"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -26,8 +27,8 @@ type options struct {
// maxCacheSize is the maximum total size of all objects saved in cache. // maxCacheSize is the maximum total size of all objects saved in cache.
// 1 GiB by default. // 1 GiB by default.
maxCacheSize uint64 maxCacheSize uint64
// objCounters contains atomic counters for the number of objects stored in cache. // counter contains atomic counters for the number of objects stored in cache.
objCounters counters counter *fstree.SimpleCounter
// noSync is true iff FSTree allows unsynchronized writes. // noSync is true iff FSTree allows unsynchronized writes.
noSync bool noSync bool
// reportError is the function called when encountering disk errors in background workers. // reportError is the function called when encountering disk errors in background workers.

View file

@ -60,8 +60,11 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
// putBig writes object to FSTree and pushes it to the flush workers queue. // putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
addr := prm.Address.EncodeToString() addr := prm.Address.EncodeToString()
cacheSz := c.estimateCacheSize() estimatedObjSize := uint64(len(prm.RawData))
if c.maxCacheSize < c.incSizeFS(cacheSz) { if estimatedObjSize == 0 {
estimatedObjSize = prm.Object.PayloadSize()
}
if !c.hasFreeSpace(estimatedObjSize) {
return ErrOutOfSpace return ErrOutOfSpace
} }

View file

@ -1,48 +1,19 @@
package writecache package writecache
import ( func (c *cache) estimateCacheSize() {
"math" count, size := c.counter.Value()
"sync/atomic" var ucount, usize uint64
if count > 0 {
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" ucount = uint64(count)
)
func (c *cache) estimateCacheSize() uint64 {
fsCount := c.objCounters.FS()
if fsCount > 0 {
fsCount-- // db file
} }
fsSize := fsCount * c.maxObjectSize if size > 0 {
c.metrics.SetEstimateSize(fsSize) usize = uint64(size)
c.metrics.SetActualCounters(fsCount) }
return fsSize c.metrics.SetEstimateSize(ucount)
c.metrics.SetActualCounters(usize)
} }
func (c *cache) incSizeFS(sz uint64) uint64 { func (c *cache) hasFreeSpace(sz uint64) bool {
return sz + c.maxObjectSize _, size := c.counter.Value()
} return size+int64(sz) <= int64(c.maxCacheSize)
var _ fstree.FileCounter = &counters{}
type counters struct {
cFS atomic.Uint64
}
func (x *counters) FS() uint64 {
return x.cFS.Load()
}
// Set implements fstree.ObjectCounter.
func (x *counters) Set(v uint64) {
x.cFS.Store(v)
}
// Inc implements fstree.ObjectCounter.
func (x *counters) Inc() {
x.cFS.Add(1)
}
// Dec implements fstree.ObjectCounter.
func (x *counters) Dec() {
x.cFS.Add(math.MaxUint64)
} }

View file

@ -28,7 +28,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
fstree.WithDepth(1), fstree.WithDepth(1),
fstree.WithDirNameLen(1), fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync), fstree.WithNoSync(c.noSync),
fstree.WithFileCounter(&c.objCounters), fstree.WithFileCounter(c.counter),
) )
if err := c.fsTree.Open(mod); err != nil { if err := c.fsTree.Open(mod); err != nil {
return fmt.Errorf("could not open FSTree: %w", err) return fmt.Errorf("could not open FSTree: %w", err)