WIP: Rever writecache improvements #1609
9 changed files with 85 additions and 130 deletions
|
@ -1,21 +1,22 @@
|
||||||
package fstree
|
package fstree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
|
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
|
||||||
type FileCounter interface {
|
type FileCounter interface {
|
||||||
Set(count, size uint64)
|
Set(v uint64)
|
||||||
Inc(size uint64)
|
Inc()
|
||||||
Dec(size uint64)
|
Dec()
|
||||||
}
|
}
|
||||||
|
|
||||||
type noopCounter struct{}
|
type noopCounter struct{}
|
||||||
|
|
||||||
func (c *noopCounter) Set(uint64, uint64) {}
|
func (c *noopCounter) Set(uint64) {}
|
||||||
func (c *noopCounter) Inc(uint64) {}
|
func (c *noopCounter) Inc() {}
|
||||||
func (c *noopCounter) Dec(uint64) {}
|
func (c *noopCounter) Dec() {}
|
||||||
|
|
||||||
func counterEnabled(c FileCounter) bool {
|
func counterEnabled(c FileCounter) bool {
|
||||||
_, noop := c.(*noopCounter)
|
_, noop := c.(*noopCounter)
|
||||||
|
@ -23,50 +24,14 @@ func counterEnabled(c FileCounter) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
type SimpleCounter struct {
|
type SimpleCounter struct {
|
||||||
mtx sync.RWMutex
|
v atomic.Uint64
|
||||||
count uint64
|
|
||||||
size uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSimpleCounter() *SimpleCounter {
|
func NewSimpleCounter() *SimpleCounter {
|
||||||
return &SimpleCounter{}
|
return &SimpleCounter{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SimpleCounter) Set(count, size uint64) {
|
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
|
||||||
c.mtx.Lock()
|
func (c *SimpleCounter) Inc() { c.v.Add(1) }
|
||||||
defer c.mtx.Unlock()
|
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
|
||||||
|
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }
|
||||||
c.count = count
|
|
||||||
c.size = size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SimpleCounter) Inc(size uint64) {
|
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
|
|
||||||
c.count++
|
|
||||||
c.size += size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SimpleCounter) Dec(size uint64) {
|
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
|
|
||||||
if c.count > 0 {
|
|
||||||
c.count--
|
|
||||||
} else {
|
|
||||||
panic("fstree.SimpleCounter: invalid count")
|
|
||||||
}
|
|
||||||
if c.size >= size {
|
|
||||||
c.size -= size
|
|
||||||
} else {
|
|
||||||
panic("fstree.SimpleCounter: invalid size")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SimpleCounter) CountSize() (uint64, uint64) {
|
|
||||||
c.mtx.RLock()
|
|
||||||
defer c.mtx.RUnlock()
|
|
||||||
|
|
||||||
return c.count, c.size
|
|
||||||
}
|
|
||||||
|
|
|
@ -435,38 +435,32 @@ func (t *FSTree) initFileCounter() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
count, size, err := t.countFiles()
|
counter, err := t.countFiles()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.fileCounter.Set(count, size)
|
t.fileCounter.Set(counter)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) countFiles() (uint64, uint64, error) {
|
func (t *FSTree) countFiles() (uint64, error) {
|
||||||
var count, size uint64
|
var counter uint64
|
||||||
// it is simpler to just consider every file
|
// it is simpler to just consider every file
|
||||||
// that is not directory as an object
|
// that is not directory as an object
|
||||||
err := filepath.WalkDir(t.RootPath,
|
err := filepath.WalkDir(t.RootPath,
|
||||||
func(_ string, d fs.DirEntry, _ error) error {
|
func(_ string, d fs.DirEntry, _ error) error {
|
||||||
if d.IsDir() {
|
if !d.IsDir() {
|
||||||
return nil
|
counter++
|
||||||
}
|
}
|
||||||
count++
|
|
||||||
info, err := d.Info()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
size += uint64(info.Size())
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return count, size, nil
|
return counter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
|
|
|
@ -47,9 +47,8 @@ func TestObjectCounter(t *testing.T) {
|
||||||
require.NoError(t, fst.Open(mode.ComponentReadWrite))
|
require.NoError(t, fst.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, fst.Init())
|
require.NoError(t, fst.Init())
|
||||||
|
|
||||||
count, size := counter.CountSize()
|
counterValue := counter.Value()
|
||||||
require.Equal(t, uint64(0), count)
|
require.Equal(t, uint64(0), counterValue)
|
||||||
require.Equal(t, uint64(0), size)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, fst.Close(context.Background()))
|
require.NoError(t, fst.Close(context.Background()))
|
||||||
|
@ -65,6 +64,9 @@ func TestObjectCounter(t *testing.T) {
|
||||||
putPrm.Address = addr
|
putPrm.Address = addr
|
||||||
putPrm.RawData, _ = obj.Marshal()
|
putPrm.RawData, _ = obj.Marshal()
|
||||||
|
|
||||||
|
var getPrm common.GetPrm
|
||||||
|
getPrm.Address = putPrm.Address
|
||||||
|
|
||||||
var delPrm common.DeletePrm
|
var delPrm common.DeletePrm
|
||||||
delPrm.Address = addr
|
delPrm.Address = addr
|
||||||
|
|
||||||
|
@ -93,9 +95,8 @@ func TestObjectCounter(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, eg.Wait())
|
require.NoError(t, eg.Wait())
|
||||||
|
|
||||||
count, size = counter.CountSize()
|
counterValue = counter.Value()
|
||||||
realCount, realSize, err := fst.countFiles()
|
realCount, err := fst.countFiles()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
require.Equal(t, realCount, counterValue)
|
||||||
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.fileCounterEnabled {
|
if w.fileCounterEnabled {
|
||||||
w.fileCounter.Inc(uint64(len(data)))
|
w.fileCounter.Inc()
|
||||||
var targetFileExists bool
|
var targetFileExists bool
|
||||||
if _, e := os.Stat(p); e == nil {
|
if _, e := os.Stat(p); e == nil {
|
||||||
targetFileExists = true
|
targetFileExists = true
|
||||||
}
|
}
|
||||||
err = os.Rename(tmpPath, p)
|
err = os.Rename(tmpPath, p)
|
||||||
if err == nil && targetFileExists {
|
if err == nil && targetFileExists {
|
||||||
w.fileCounter.Dec(uint64(len(data)))
|
w.fileCounter.Dec()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = os.Rename(tmpPath, p)
|
err = os.Rename(tmpPath, p)
|
||||||
|
@ -110,7 +110,12 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
|
||||||
func (w *genericWriter) removeFile(p string) error {
|
func (w *genericWriter) removeFile(p string) error {
|
||||||
var err error
|
var err error
|
||||||
if w.fileCounterEnabled {
|
if w.fileCounterEnabled {
|
||||||
err = w.removeWithCounter(p)
|
w.fileGuard.Lock(p)
|
||||||
|
err = os.Remove(p)
|
||||||
|
w.fileGuard.Unlock(p)
|
||||||
|
if err == nil {
|
||||||
|
w.fileCounter.Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = os.Remove(p)
|
err = os.Remove(p)
|
||||||
}
|
}
|
||||||
|
@ -120,19 +125,3 @@ func (w *genericWriter) removeFile(p string) error {
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *genericWriter) removeWithCounter(p string) error {
|
|
||||||
w.fileGuard.Lock(p)
|
|
||||||
defer w.fileGuard.Unlock(p)
|
|
||||||
|
|
||||||
stat, err := os.Stat(p)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.Remove(p); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.fileCounter.Dec(uint64(stat.Size()))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
@ -19,9 +18,7 @@ type linuxWriter struct {
|
||||||
perm uint32
|
perm uint32
|
||||||
flags int
|
flags int
|
||||||
|
|
||||||
fileGuard keyLock
|
counter FileCounter
|
||||||
fileCounter FileCounter
|
|
||||||
fileCounterEnabled bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
|
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
|
||||||
|
@ -36,18 +33,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_ = unix.Close(fd) // Don't care about error.
|
_ = unix.Close(fd) // Don't care about error.
|
||||||
var fileGuard keyLock = &noopKeyLock{}
|
|
||||||
fileCounterEnabled := counterEnabled(c)
|
|
||||||
if fileCounterEnabled {
|
|
||||||
fileGuard = utilSync.NewKeyLocker[string]()
|
|
||||||
}
|
|
||||||
w := &linuxWriter{
|
w := &linuxWriter{
|
||||||
root: root,
|
root: root,
|
||||||
perm: uint32(perm),
|
perm: uint32(perm),
|
||||||
flags: flags,
|
flags: flags,
|
||||||
fileGuard: fileGuard,
|
counter: c,
|
||||||
fileCounter: c,
|
|
||||||
fileCounterEnabled: fileCounterEnabled,
|
|
||||||
}
|
}
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
@ -61,10 +51,6 @@ func (w *linuxWriter) writeData(p string, data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *linuxWriter) writeFile(p string, data []byte) error {
|
func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
if w.fileCounterEnabled {
|
|
||||||
w.fileGuard.Lock(p)
|
|
||||||
defer w.fileGuard.Unlock(p)
|
|
||||||
}
|
|
||||||
fd, err := unix.Open(w.root, w.flags, w.perm)
|
fd, err := unix.Open(w.root, w.flags, w.perm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -75,7 +61,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
if n == len(data) {
|
if n == len(data) {
|
||||||
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Inc(uint64(len(data)))
|
w.counter.Inc()
|
||||||
}
|
}
|
||||||
if errors.Is(err, unix.EEXIST) {
|
if errors.Is(err, unix.EEXIST) {
|
||||||
err = nil
|
err = nil
|
||||||
|
@ -92,24 +78,12 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *linuxWriter) removeFile(p string) error {
|
func (w *linuxWriter) removeFile(p string) error {
|
||||||
if w.fileCounterEnabled {
|
err := unix.Unlink(p)
|
||||||
w.fileGuard.Lock(p)
|
|
||||||
defer w.fileGuard.Unlock(p)
|
|
||||||
}
|
|
||||||
var stat unix.Stat_t
|
|
||||||
err := unix.Stat(p, &stat)
|
|
||||||
if err != nil {
|
|
||||||
if err == unix.ENOENT {
|
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = unix.Unlink(p)
|
|
||||||
if err != nil && err == unix.ENOENT {
|
if err != nil && err == unix.ENOENT {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Dec(uint64(stat.Size))
|
w.counter.Dec()
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,6 @@ type cache struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// fsTree contains big files stored directly on file-system.
|
// fsTree contains big files stored directly on file-system.
|
||||||
fsTree *fstree.FSTree
|
fsTree *fstree.FSTree
|
||||||
// counter contains atomic counters for the number of objects stored in cache.
|
|
||||||
counter *fstree.SimpleCounter
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// wcStorageType is used for write-cache operations logging.
|
// wcStorageType is used for write-cache operations logging.
|
||||||
|
@ -58,7 +56,6 @@ func New(opts ...Option) Cache {
|
||||||
c := &cache{
|
c := &cache{
|
||||||
flushCh: make(chan objectInfo),
|
flushCh: make(chan objectInfo),
|
||||||
mode: mode.Disabled,
|
mode: mode.Disabled,
|
||||||
counter: fstree.NewSimpleCounter(),
|
|
||||||
|
|
||||||
options: options{
|
options: options{
|
||||||
log: logger.NewLoggerWrapper(zap.NewNop()),
|
log: logger.NewLoggerWrapper(zap.NewNop()),
|
||||||
|
|
|
@ -31,6 +31,8 @@ type options struct {
|
||||||
// maxCacheCount is the maximum total count of all object saved in cache.
|
// maxCacheCount is the maximum total count of all object saved in cache.
|
||||||
// 0 (no limit) by default.
|
// 0 (no limit) by default.
|
||||||
maxCacheCount uint64
|
maxCacheCount uint64
|
||||||
|
// objCounters contains atomic counters for the number of objects stored in cache.
|
||||||
|
objCounters counters
|
||||||
// maxBatchSize is the maximum batch size for the small object database.
|
// maxBatchSize is the maximum batch size for the small object database.
|
||||||
maxBatchSize int
|
maxBatchSize int
|
||||||
// maxBatchDelay is the maximum batch wait time for the small object database.
|
// maxBatchDelay is the maximum batch wait time for the small object database.
|
||||||
|
|
|
@ -1,10 +1,18 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() (uint64, uint64) {
|
func (c *cache) estimateCacheSize() (uint64, uint64) {
|
||||||
count, size := c.counter.CountSize()
|
fsCount := c.objCounters.FS()
|
||||||
c.metrics.SetEstimateSize(0, size)
|
fsSize := fsCount * c.maxObjectSize
|
||||||
c.metrics.SetActualCounters(0, count)
|
c.metrics.SetEstimateSize(0, fsSize)
|
||||||
return count, size
|
c.metrics.SetActualCounters(0, fsCount)
|
||||||
|
return fsCount, fsSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) hasEnoughSpaceFS() bool {
|
func (c *cache) hasEnoughSpaceFS() bool {
|
||||||
|
@ -19,6 +27,31 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool {
|
||||||
return c.maxCacheSize >= size+objectSize
|
return c.maxCacheSize >= size+objectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ fstree.FileCounter = &counters{}
|
||||||
|
|
||||||
|
type counters struct {
|
||||||
|
cFS atomic.Uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *counters) FS() uint64 {
|
||||||
|
return x.cFS.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Set(v uint64) {
|
||||||
|
x.cFS.Store(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inc implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Inc() {
|
||||||
|
x.cFS.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dec implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Dec() {
|
||||||
|
x.cFS.Add(math.MaxUint64)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cache) initCounters() error {
|
func (c *cache) initCounters() error {
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -30,7 +30,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
||||||
fstree.WithDepth(1),
|
fstree.WithDepth(1),
|
||||||
fstree.WithDirNameLen(1),
|
fstree.WithDirNameLen(1),
|
||||||
fstree.WithNoSync(c.noSync),
|
fstree.WithNoSync(c.noSync),
|
||||||
fstree.WithFileCounter(c.counter),
|
fstree.WithFileCounter(&c.objCounters),
|
||||||
)
|
)
|
||||||
if err := c.fsTree.Open(mod); err != nil {
|
if err := c.fsTree.Open(mod); err != nil {
|
||||||
return fmt.Errorf("could not open FSTree: %w", err)
|
return fmt.Errorf("could not open FSTree: %w", err)
|
||||||
|
|
Loading…
Add table
Reference in a new issue