WIP: Rever writecache improvements #1609

Draft
dstepanov-yadro wants to merge 10 commits from dstepanov-yadro/frostfs-node:fix/revert_writecache_improvements into support/v0.44
36 changed files with 738 additions and 670 deletions

View file

@ -25,7 +25,7 @@ func init() {
func inspectFunc(cmd *cobra.Command, _ []string) { func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte var data []byte
db, err := writecache.OpenDB(vPath, true, os.OpenFile) db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close() defer db.Close()

View file

@ -31,7 +31,7 @@ func listFunc(cmd *cobra.Command, _ []string) {
return err return err
} }
db, err := writecache.OpenDB(vPath, true, os.OpenFile) db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close() defer db.Close()

View file

@ -154,12 +154,15 @@ type shardCfg struct {
writecacheCfg struct { writecacheCfg struct {
enabled bool enabled bool
path string path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64 maxObjSize uint64
flushWorkerCount int flushWorkerCount int
sizeLimit uint64 sizeLimit uint64
countLimit uint64 countLimit uint64
noSync bool noSync bool
flushSizeLimit uint64 pageSize int
} }
piloramaCfg struct { piloramaCfg struct {
@ -289,12 +292,15 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.enabled = true wc.enabled = true
wc.path = writeCacheCfg.Path() wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.pageSize = writeCacheCfg.BoltDB().PageSize()
wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit() wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit() wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync() wc.noSync = writeCacheCfg.NoSync()
wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
} }
} }
@ -910,8 +916,11 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
if wcRead := shCfg.writecacheCfg; wcRead.enabled { if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts, writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path), writecache.WithPath(wcRead.path),
writecache.WithFlushSizeLimit(wcRead.flushSizeLimit), writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithPageSize(wcRead.pageSize),
writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithMaxCacheCount(wcRead.countLimit),

View file

@ -89,11 +89,12 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, wc.NoSync()) require.Equal(t, true, wc.NoSync())
require.Equal(t, "tmp/0/cache", wc.Path()) require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 4096, wc.BoltDB().PageSize())
require.EqualValues(t, 49, wc.CountLimit()) require.EqualValues(t, 49, wc.CountLimit())
require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize())
require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -145,11 +146,12 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, wc.NoSync()) require.Equal(t, false, wc.NoSync())
require.Equal(t, "tmp/1/cache", wc.Path()) require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 0, wc.BoltDB().PageSize())
require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit()) require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit())
require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize())
require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -2,6 +2,7 @@ package writecacheconfig
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
boltdbconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/boltdb"
) )
// Config is a wrapper over the config section // Config is a wrapper over the config section
@ -9,6 +10,9 @@ import (
type Config config.Config type Config config.Config
const ( const (
// SmallSizeDefault is a default size of small objects.
SmallSizeDefault = 32 << 10
// MaxSizeDefault is a default value of the object payload size limit. // MaxSizeDefault is a default value of the object payload size limit.
MaxSizeDefault = 64 << 20 MaxSizeDefault = 64 << 20
@ -20,8 +24,6 @@ const (
// CountLimitDefault is a default write-cache count limit. // CountLimitDefault is a default write-cache count limit.
CountLimitDefault = 0 CountLimitDefault = 0
MaxFlushingObjectsSizeDefault = 128 << 20
) )
// From wraps config section into Config. // From wraps config section into Config.
@ -52,6 +54,22 @@ func (x *Config) Path() string {
return p return p
} }
// SmallObjectSize returns the value of "small_object_size" config parameter.
//
// Returns SmallSizeDefault if the value is not a positive number.
func (x *Config) SmallObjectSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"small_object_size",
)
if s > 0 {
return s
}
return SmallSizeDefault
}
// MaxObjectSize returns the value of "max_object_size" config parameter. // MaxObjectSize returns the value of "max_object_size" config parameter.
// //
// Returns MaxSizeDefault if the value is not a positive number. // Returns MaxSizeDefault if the value is not a positive number.
@ -123,18 +141,7 @@ func (x *Config) NoSync() bool {
return config.BoolSafe((*config.Config)(x), "no_sync") return config.BoolSafe((*config.Config)(x), "no_sync")
} }
// MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter. // BoltDB returns config instance for querying bolt db specific parameters.
// func (x *Config) BoltDB() *boltdbconfig.Config {
// Returns MaxFlushingObjectsSizeDefault if the value is not a positive number. return (*boltdbconfig.Config)(x)
func (x *Config) MaxFlushingObjectsSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"max_flushing_objects_size",
)
if s > 0 {
return s
}
return MaxFlushingObjectsSizeDefault
} }

View file

@ -109,7 +109,6 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096 FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100
### Metabase config ### Metabase config
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644 FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644

View file

@ -154,8 +154,7 @@
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 3221225472, "capacity": 3221225472,
"page_size": 4096, "page_size": 4096,
"max_object_count": 49, "max_object_count": 49
"max_flushing_objects_size": 100
}, },
"metabase": { "metabase": {
"path": "tmp/0/meta", "path": "tmp/0/meta",

View file

@ -179,7 +179,6 @@ storage:
capacity: 3221225472 # approximate write-cache total size, bytes capacity: 3221225472 # approximate write-cache total size, bytes
max_object_count: 49 max_object_count: 49
page_size: 4k page_size: 4k
max_flushing_objects_size: 100b
metabase: metabase:
path: tmp/0/meta # metabase path path: tmp/0/meta # metabase path

View file

@ -287,18 +287,23 @@ writecache:
enabled: true enabled: true
path: /path/to/writecache path: /path/to/writecache
capacity: 4294967296 capacity: 4294967296
small_object_size: 16384
max_object_size: 134217728 max_object_size: 134217728
flush_worker_count: 30 flush_worker_count: 30
page_size: '4k'
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
| --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- | |----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|
| `path` | `string` | | Path to the metabase file. | | `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | | `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | | `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | | `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | | `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. | | `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. |
# `node` section # `node` section

View file

@ -516,8 +516,10 @@ const (
StartedWritecacheSealAsync = "started writecache seal async" StartedWritecacheSealAsync = "started writecache seal async"
WritecacheSealCompletedAsync = "writecache seal completed successfully" WritecacheSealCompletedAsync = "writecache seal completed successfully"
FailedToSealWritecacheAsync = "failed to seal writecache async" FailedToSealWritecacheAsync = "failed to seal writecache async"
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
WritecacheCantGetObject = "can't get an object from fstree"
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB"
) )

View file

@ -8,7 +8,6 @@ import (
type DeletePrm struct { type DeletePrm struct {
Address oid.Address Address oid.Address
StorageID []byte StorageID []byte
Size uint64
} }
// DeleteRes groups the resulting values of Delete operation. // DeleteRes groups the resulting values of Delete operation.

View file

@ -1,21 +1,22 @@
package fstree package fstree
import ( import (
"sync" "math"
"sync/atomic"
) )
// FileCounter used to count files in FSTree. The implementation must be thread-safe. // FileCounter used to count files in FSTree. The implementation must be thread-safe.
type FileCounter interface { type FileCounter interface {
Set(count, size uint64) Set(v uint64)
Inc(size uint64) Inc()
Dec(size uint64) Dec()
} }
type noopCounter struct{} type noopCounter struct{}
func (c *noopCounter) Set(uint64, uint64) {} func (c *noopCounter) Set(uint64) {}
func (c *noopCounter) Inc(uint64) {} func (c *noopCounter) Inc() {}
func (c *noopCounter) Dec(uint64) {} func (c *noopCounter) Dec() {}
func counterEnabled(c FileCounter) bool { func counterEnabled(c FileCounter) bool {
_, noop := c.(*noopCounter) _, noop := c.(*noopCounter)
@ -23,50 +24,14 @@ func counterEnabled(c FileCounter) bool {
} }
type SimpleCounter struct { type SimpleCounter struct {
mtx sync.RWMutex v atomic.Uint64
count uint64
size uint64
} }
func NewSimpleCounter() *SimpleCounter { func NewSimpleCounter() *SimpleCounter {
return &SimpleCounter{} return &SimpleCounter{}
} }
func (c *SimpleCounter) Set(count, size uint64) { func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
c.mtx.Lock() func (c *SimpleCounter) Inc() { c.v.Add(1) }
defer c.mtx.Unlock() func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }
c.count = count
c.size = size
}
func (c *SimpleCounter) Inc(size uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.count++
c.size += size
}
func (c *SimpleCounter) Dec(size uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.count > 0 {
c.count--
} else {
panic("fstree.SimpleCounter: invalid count")
}
if c.size >= size {
c.size -= size
} else {
panic("fstree.SimpleCounter: invalid size")
}
}
func (c *SimpleCounter) CountSize() (uint64, uint64) {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.count, c.size
}

View file

@ -222,81 +222,6 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr
return nil return nil
} }
type ObjectInfo struct {
Address oid.Address
DataSize uint64
}
type IterateInfoHandler func(ObjectInfo) error
func (t *FSTree) IterateInfo(ctx context.Context, handler IterateInfoHandler) error {
var (
err error
startedAt = time.Now()
)
defer func() {
t.metrics.IterateInfo(time.Since(startedAt), err == nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "FSTree.IterateInfo")
defer span.End()
return t.iterateInfo(ctx, 0, []string{t.RootPath}, handler)
}
func (t *FSTree) iterateInfo(ctx context.Context, depth uint64, curPath []string, handler IterateInfoHandler) error {
curName := strings.Join(curPath[1:], "")
dirPath := filepath.Join(curPath...)
entries, err := os.ReadDir(dirPath)
if err != nil {
return fmt.Errorf("read fstree dir '%s': %w", dirPath, err)
}
isLast := depth >= t.Depth
l := len(curPath)
curPath = append(curPath, "")
for i := range entries {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
curPath[l] = entries[i].Name()
if !isLast && entries[i].IsDir() {
err := t.iterateInfo(ctx, depth+1, curPath, handler)
if err != nil {
return err
}
}
if depth != t.Depth {
continue
}
addr, err := addressFromString(curName + entries[i].Name())
if err != nil {
continue
}
info, err := entries[i].Info()
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}
err = handler(ObjectInfo{
Address: addr,
DataSize: uint64(info.Size()),
})
if err != nil {
return err
}
}
return nil
}
func (t *FSTree) treePath(addr oid.Address) string { func (t *FSTree) treePath(addr oid.Address) string {
sAddr := stringifyAddress(addr) sAddr := stringifyAddress(addr)
@ -338,7 +263,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
} }
p := t.treePath(prm.Address) p := t.treePath(prm.Address)
err = t.writer.removeFile(p, prm.Size) err = t.writer.removeFile(p)
return common.DeleteRes{}, err return common.DeleteRes{}, err
} }
@ -510,38 +435,32 @@ func (t *FSTree) initFileCounter() error {
return nil return nil
} }
count, size, err := t.countFiles() counter, err := t.countFiles()
if err != nil { if err != nil {
return err return err
} }
t.fileCounter.Set(count, size) t.fileCounter.Set(counter)
return nil return nil
} }
func (t *FSTree) countFiles() (uint64, uint64, error) { func (t *FSTree) countFiles() (uint64, error) {
var count, size uint64 var counter uint64
// it is simpler to just consider every file // it is simpler to just consider every file
// that is not directory as an object // that is not directory as an object
err := filepath.WalkDir(t.RootPath, err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error { func(_ string, d fs.DirEntry, _ error) error {
if d.IsDir() { if !d.IsDir() {
return nil counter++
} }
count++
info, err := d.Info()
if err != nil {
return err
}
size += uint64(info.Size())
return nil return nil
}, },
) )
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
} }
return count, size, nil return counter, nil
} }
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {

View file

@ -47,9 +47,8 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Open(mode.ComponentReadWrite))
require.NoError(t, fst.Init()) require.NoError(t, fst.Init())
count, size := counter.CountSize() counterValue := counter.Value()
require.Equal(t, uint64(0), count) require.Equal(t, uint64(0), counterValue)
require.Equal(t, uint64(0), size)
defer func() { defer func() {
require.NoError(t, fst.Close(context.Background())) require.NoError(t, fst.Close(context.Background()))
@ -65,10 +64,12 @@ func TestObjectCounter(t *testing.T) {
putPrm.Address = addr putPrm.Address = addr
putPrm.RawData, _ = obj.Marshal() putPrm.RawData, _ = obj.Marshal()
var getPrm common.GetPrm
getPrm.Address = putPrm.Address
var delPrm common.DeletePrm var delPrm common.DeletePrm
delPrm.Address = addr delPrm.Address = addr
t.Run("without size hint", func(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error { eg.Go(func() error {
@ -94,44 +95,8 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
count, size = counter.CountSize() counterValue = counter.Value()
realCount, realSize, err := fst.countFiles() realCount, err := fst.countFiles()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) require.Equal(t, realCount, counterValue)
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
})
t.Run("with size hint", func(t *testing.T) {
delPrm.Size = uint64(len(putPrm.RawData))
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
for range 1_000 {
_, err := fst.Put(egCtx, putPrm)
if err != nil {
return err
}
}
return nil
})
eg.Go(func() error {
var le logicerr.Logical
for range 1_000 {
_, err := fst.Delete(egCtx, delPrm)
if err != nil && !errors.As(err, &le) {
return err
}
}
return nil
})
require.NoError(t, eg.Wait())
count, size = counter.CountSize()
realCount, realSize, err := fst.countFiles()
require.NoError(t, err)
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
})
} }

View file

@ -16,7 +16,7 @@ import (
type writer interface { type writer interface {
writeData(string, []byte) error writeData(string, []byte) error
removeFile(string, uint64) error removeFile(string) error
} }
type genericWriter struct { type genericWriter struct {
@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
} }
if w.fileCounterEnabled { if w.fileCounterEnabled {
w.fileCounter.Inc(uint64(len(data))) w.fileCounter.Inc()
var targetFileExists bool var targetFileExists bool
if _, e := os.Stat(p); e == nil { if _, e := os.Stat(p); e == nil {
targetFileExists = true targetFileExists = true
} }
err = os.Rename(tmpPath, p) err = os.Rename(tmpPath, p)
if err == nil && targetFileExists { if err == nil && targetFileExists {
w.fileCounter.Dec(uint64(len(data))) w.fileCounter.Dec()
} }
} else { } else {
err = os.Rename(tmpPath, p) err = os.Rename(tmpPath, p)
@ -107,10 +107,15 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
return err return err
} }
func (w *genericWriter) removeFile(p string, size uint64) error { func (w *genericWriter) removeFile(p string) error {
var err error var err error
if w.fileCounterEnabled { if w.fileCounterEnabled {
err = w.removeWithCounter(p, size) w.fileGuard.Lock(p)
err = os.Remove(p)
w.fileGuard.Unlock(p)
if err == nil {
w.fileCounter.Dec()
}
} else { } else {
err = os.Remove(p) err = os.Remove(p)
} }
@ -120,22 +125,3 @@ func (w *genericWriter) removeFile(p string, size uint64) error {
} }
return err return err
} }
func (w *genericWriter) removeWithCounter(p string, size uint64) error {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
if size == 0 {
stat, err := os.Stat(p)
if err != nil {
return err
}
size = uint64(stat.Size())
}
if err := os.Remove(p); err != nil {
return err
}
w.fileCounter.Dec(uint64(size))
return nil
}

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@ -19,9 +18,7 @@ type linuxWriter struct {
perm uint32 perm uint32
flags int flags int
fileGuard keyLock counter FileCounter
fileCounter FileCounter
fileCounterEnabled bool
} }
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
@ -36,18 +33,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
return nil return nil
} }
_ = unix.Close(fd) // Don't care about error. _ = unix.Close(fd) // Don't care about error.
var fileGuard keyLock = &noopKeyLock{}
fileCounterEnabled := counterEnabled(c)
if fileCounterEnabled {
fileGuard = utilSync.NewKeyLocker[string]()
}
w := &linuxWriter{ w := &linuxWriter{
root: root, root: root,
perm: uint32(perm), perm: uint32(perm),
flags: flags, flags: flags,
fileGuard: fileGuard, counter: c,
fileCounter: c,
fileCounterEnabled: fileCounterEnabled,
} }
return w return w
} }
@ -61,10 +51,6 @@ func (w *linuxWriter) writeData(p string, data []byte) error {
} }
func (w *linuxWriter) writeFile(p string, data []byte) error { func (w *linuxWriter) writeFile(p string, data []byte) error {
if w.fileCounterEnabled {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
}
fd, err := unix.Open(w.root, w.flags, w.perm) fd, err := unix.Open(w.root, w.flags, w.perm)
if err != nil { if err != nil {
return err return err
@ -75,7 +61,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if n == len(data) { if n == len(data) {
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
if err == nil { if err == nil {
w.fileCounter.Inc(uint64(len(data))) w.counter.Inc()
} }
if errors.Is(err, unix.EEXIST) { if errors.Is(err, unix.EEXIST) {
err = nil err = nil
@ -91,30 +77,13 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
return errClose return errClose
} }
func (w *linuxWriter) removeFile(p string, size uint64) error { func (w *linuxWriter) removeFile(p string) error {
if w.fileCounterEnabled {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
if size == 0 {
var stat unix.Stat_t
err := unix.Stat(p, &stat)
if err != nil {
if err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return err
}
size = uint64(stat.Size)
}
}
err := unix.Unlink(p) err := unix.Unlink(p)
if err != nil && err == unix.ENOENT { if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
if err == nil { if err == nil {
w.fileCounter.Dec(uint64(size)) w.counter.Dec()
} }
return err return err
} }

View file

@ -13,7 +13,6 @@ type Metrics interface {
Close() Close()
Iterate(d time.Duration, success bool) Iterate(d time.Duration, success bool)
IterateInfo(d time.Duration, success bool)
Delete(d time.Duration, success bool) Delete(d time.Duration, success bool)
Exists(d time.Duration, success bool) Exists(d time.Duration, success bool)
Put(d time.Duration, size int, success bool) Put(d time.Duration, size int, success bool)
@ -28,7 +27,6 @@ func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(mode.ComponentMode) {} func (m *noopMetrics) SetMode(mode.ComponentMode) {}
func (m *noopMetrics) Close() {} func (m *noopMetrics) Close() {}
func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) IterateInfo(time.Duration, bool) {}
func (m *noopMetrics) Delete(time.Duration, bool) {} func (m *noopMetrics) Delete(time.Duration, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool) {} func (m *noopMetrics) Exists(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {}

View file

@ -169,16 +169,18 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto
m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d)
} }
func (m *writeCacheMetrics) SetEstimateSize(size uint64) { func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), size) m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }
func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) { func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) {
m.metrics.SetMode(m.shardID, mod.String()) m.metrics.SetMode(m.shardID, mod.String())
} }
func (m *writeCacheMetrics) SetActualCounters(count uint64) { func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), count) m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }
func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) { func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {

View file

@ -38,10 +38,6 @@ func (m *fstreeMetrics) Iterate(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success) m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success)
} }
func (m *fstreeMetrics) IterateInfo(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "IterateInfo", d, success)
}
func (m *fstreeMetrics) Delete(d time.Duration, success bool) { func (m *fstreeMetrics) Delete(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Delete", d, success) m.m.MethodDuration(m.shardID, m.path, "Delete", d, success)
} }

View file

@ -118,5 +118,6 @@ func newCache(b *testing.B) writecache.Cache {
writecache.WithBlobstor(bs), writecache.WithBlobstor(bs),
writecache.WithMetabase(testMetabase{}), writecache.WithMetabase(testMetabase{}),
writecache.WithMaxCacheSize(256<<30), writecache.WithMaxCacheSize(256<<30),
writecache.WithSmallObjectSize(128<<10),
) )
} }

View file

@ -2,7 +2,7 @@ package writecache
import ( import (
"context" "context"
"fmt" "os"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -10,7 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -26,41 +27,48 @@ type cache struct {
cancel atomic.Value cancel atomic.Value
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// store contains underlying database.
store
// fsTree contains big files stored directly on file-system. // fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree fsTree *fstree.FSTree
// counter contains atomic counters for the number of objects stored in cache.
counter *fstree.SimpleCounter
} }
// wcStorageType is used for write-cache operations logging. // wcStorageType is used for write-cache operations logging.
const wcStorageType = "write-cache" const wcStorageType = "write-cache"
type objectInfo struct { type objectInfo struct {
addr oid.Address addr string
size uint64 data []byte
obj *objectSDK.Object
} }
const ( const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB
defaultMaxCacheSize = 1 << 30 // 1 GiB defaultMaxCacheSize = 1 << 30 // 1 GiB
) )
var dummyCanceler context.CancelFunc = func() {} var (
defaultBucket = []byte{0}
dummyCanceler context.CancelFunc = func() {}
)
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) Cache { func New(opts ...Option) Cache {
c := &cache{ c := &cache{
flushCh: make(chan objectInfo), flushCh: make(chan objectInfo),
mode: mode.Disabled, mode: mode.Disabled,
counter: fstree.NewSimpleCounter(),
options: options{ options: options{
log: logger.NewLoggerWrapper(zap.NewNop()), log: logger.NewLoggerWrapper(zap.NewNop()),
maxObjectSize: defaultMaxObjectSize, maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize,
workersCount: defaultFlushWorkersCount, workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize, maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
metrics: DefaultMetrics(), metrics: DefaultMetrics(),
flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize,
}, },
} }
@ -94,23 +102,21 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
if err != nil { if err != nil {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
return metaerr.Wrap(c.initCounters()) return metaerr.Wrap(c.initCounters())
} }
// Init runs necessary services. // Init runs necessary services.
func (c *cache) Init(ctx context.Context) error { func (c *cache) Init(ctx context.Context) error {
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
if err := c.flushAndDropBBoltDB(ctx); err != nil { ctx, cancel := context.WithCancel(ctx)
return fmt.Errorf("flush previous version write-cache database: %w", err)
}
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) // canceling performed by cache
c.cancel.Store(cancel) c.cancel.Store(cancel)
c.runFlushLoop(ctx) c.runFlushLoop(ctx)
return nil return nil
} }
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close(ctx context.Context) error { func (c *cache) Close(_ context.Context) error {
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil { if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
cancelValue.(context.CancelFunc)() cancelValue.(context.CancelFunc)()
} }
@ -126,10 +132,10 @@ func (c *cache) Close(ctx context.Context) error {
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
var err error var err error
if c.fsTree != nil { if c.db != nil {
err = c.fsTree.Close(ctx) err = c.db.Close()
if err != nil { if err != nil {
c.fsTree = nil c.db = nil
} }
} }
c.metrics.Close() c.metrics.Close()

View file

@ -2,6 +2,7 @@ package writecache
import ( import (
"context" "context"
"math"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -9,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -43,11 +45,46 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
return ErrDegraded return ErrDegraded
} }
saddr := addr.EncodeToString()
var dataSize int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
dataSize = len(b.Get([]byte(saddr)))
return nil
})
if dataSize > 0 {
storageType = StorageTypeDB
var recordDeleted bool
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(saddr)
recordDeleted = b.Get(key) != nil
err := b.Delete(key)
return err
})
if err != nil {
return err
}
storagelog.Write(ctx, c.log,
storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
deleted = true
return nil
}
storageType = StorageTypeFSTree storageType = StorageTypeFSTree
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil { if err == nil {
storagelog.Write(ctx, c.log, storagelog.Write(ctx, c.log,
storagelog.AddressField(addr.EncodeToString()), storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"), storagelog.OpField("fstree DELETE"),
) )

View file

@ -1,6 +1,7 @@
package writecache package writecache
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"time" "time"
@ -9,23 +10,28 @@ import (
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( const (
// flushBatchSize is amount of keys which will be read from cache to be flushed
// to the main storage. It is used to reduce contention between cache put
// and cache persist.
flushBatchSize = 512
// defaultFlushWorkersCount is number of workers for putting objects in main storage. // defaultFlushWorkersCount is number of workers for putting objects in main storage.
defaultFlushWorkersCount = 20 defaultFlushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes. // defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = 10 * time.Second defaultFlushInterval = time.Second
) )
var errIterationCompleted = errors.New("iteration completed") var errIterationCompleted = errors.New("iteration completed")
@ -35,53 +41,126 @@ func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush { if c.disableBackgroundFlush {
return return
} }
fl := newFlushLimiter(c.flushSizeLimit) for range c.workersCount {
c.wg.Add(1)
go c.workerFlushSmall(ctx)
}
c.wg.Add(1)
go func() {
c.workerFlushBig(ctx)
c.wg.Done()
}()
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
defer c.wg.Done() defer c.wg.Done()
c.pushToFlushQueue(ctx, fl)
tt := time.NewTimer(defaultFlushInterval)
defer tt.Stop()
for {
select {
case <-tt.C:
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
c.estimateCacheSize()
case <-ctx.Done():
return
}
}
}() }()
}
for range c.workersCount { func (c *cache) flushSmallObjects(ctx context.Context) {
c.wg.Add(1) var lastKey []byte
go c.workerFlush(ctx, fl) for {
select {
case <-ctx.Done():
return
default:
}
var m []objectInfo
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
var k, v []byte
if len(lastKey) == 0 {
k, v = cs.First()
} else {
k, v = cs.Seek(lastKey)
if bytes.Equal(k, lastKey) {
k, v = cs.Next()
} }
} }
func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
stopf := context.AfterFunc(ctx, func() { if len(lastKey) == len(k) {
fl.close() copy(lastKey, k)
} else {
lastKey = bytes.Clone(k)
}
m = append(m, objectInfo{
addr: string(k),
data: bytes.Clone(v),
})
}
return nil
}) })
defer stopf()
tick := time.NewTicker(defaultFlushInterval) var count int
for i := range m {
obj := objectSDK.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
m[i].obj = obj
count++
select {
case c.flushCh <- m[i]:
case <-ctx.Done():
c.modeMtx.RUnlock()
return
}
}
c.modeMtx.RUnlock()
if count == 0 {
break
}
c.log.Debug(ctx, logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey)))
}
}
func (c *cache) workerFlushBig(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10)
for { for {
select { select {
case <-tick.C: case <-tick.C:
c.modeMtx.RLock() c.modeMtx.RLock()
if c.readOnly() || c.noMetabase() { if c.readOnly() || c.noMetabase() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
continue break
} }
err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { _ = c.flushFSTree(ctx, true)
if err := fl.acquire(oi.DataSize); err != nil {
return err
}
select {
case c.flushCh <- objectInfo{
addr: oi.Address,
size: oi.DataSize,
}:
return nil
case <-ctx.Done():
fl.release(oi.DataSize)
return ctx.Err()
}
})
if err != nil {
c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err))
}
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
case <-ctx.Done(): case <-ctx.Done():
@ -90,42 +169,6 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
} }
} }
func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) {
defer c.wg.Done()
var objInfo objectInfo
for {
select {
case objInfo = <-c.flushCh:
c.flushIfAnObjectExistsWorker(ctx, objInfo, fl)
case <-ctx.Done():
return
}
}
}
func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
defer fl.release(objInfo.size)
res, err := c.fsTree.Get(ctx, common.GetPrm{
Address: objInfo.addr,
})
if err != nil {
if !client.IsErrObjectNotFound(err) {
c.reportFlushError(ctx, logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err))
}
return
}
err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree)
if err != nil {
// Error is handled in flushObject.
return
}
c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData)))
}
func (c *cache) reportFlushError(ctx context.Context, msg string, addr string, err error) { func (c *cache) reportFlushError(ctx context.Context, msg string, addr string, err error) {
if c.reportError != nil { if c.reportError != nil {
c.reportError(ctx, msg, err) c.reportError(ctx, msg, err)
@ -154,10 +197,13 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree)
if err != nil { if err != nil {
if ignoreErrors {
return nil
}
return err return err
} }
c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData))) c.deleteFromDisk(ctx, e.Address)
return nil return nil
} }
@ -165,6 +211,29 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-ctx.Done():
return
}
err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB(ctx, objInfo.addr, true)
}
}
// flushObject is used to write object directly to the main storage. // flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error {
var err error var err error
@ -231,5 +300,74 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
} }
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return c.flushFSTree(ctx, ignoreErrors) if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
return err
}
var last string
for {
batch, err := c.readNextDBBatch(ctx, ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(ctx, item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(ctx context.Context, ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
} }

View file

@ -19,17 +19,19 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
func TestFlush(t *testing.T) { func TestFlush(t *testing.T) {
testlogger := test.NewLogger(t) testlogger := test.NewLogger(t)
createCacheFn := func(t *testing.T, mb *meta.DB, bs MainStorage, opts ...Option) Cache { createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs MainStorage, opts ...Option) Cache {
return New( return New(
append([]Option{ append([]Option{
WithLogger(testlogger), WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")), WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb), WithMetabase(mb),
WithBlobstor(bs), WithBlobstor(bs),
WithDisableBackgroundFlush(), WithDisableBackgroundFlush(),
@ -45,6 +47,31 @@ func TestFlush(t *testing.T) {
} }
failures := []TestFailureInjector[Option]{ failures := []TestFailureInjector[Option]{
{
Desc: "db, invalid address",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
obj := testutil.GenerateObject()
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
}))
},
},
{
Desc: "db, invalid object",
InjectFn: func(t *testing.T, wc Cache) {
c := wc.(*cache)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
k := []byte(oidtest.Address().EncodeToString())
v := []byte{1, 2, 3}
return b.Put(k, v)
}))
},
},
{ {
Desc: "fs, read error", Desc: "fs, read error",
InjectFn: func(t *testing.T, wc Cache) { InjectFn: func(t *testing.T, wc Cache) {
@ -91,6 +118,7 @@ const (
type CreateCacheFunc[Option any] func( type CreateCacheFunc[Option any] func(
t *testing.T, t *testing.T,
smallSize uint64,
meta *meta.DB, meta *meta.DB,
bs MainStorage, bs MainStorage,
opts ...Option, opts ...Option,
@ -113,7 +141,7 @@ func runFlushTest[Option any](
failures ...TestFailureInjector[Option], failures ...TestFailureInjector[Option],
) { ) {
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn) wc, bs, mb := newCache(t, createCacheFn, smallSize)
defer func() { require.NoError(t, wc.Close(context.Background())) }() defer func() { require.NoError(t, wc.Close(context.Background())) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
@ -126,7 +154,7 @@ func runFlushTest[Option any](
}) })
t.Run("flush on moving to degraded mode", func(t *testing.T) { t.Run("flush on moving to degraded mode", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn) wc, bs, mb := newCache(t, createCacheFn, smallSize)
defer func() { require.NoError(t, wc.Close(context.Background())) }() defer func() { require.NoError(t, wc.Close(context.Background())) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
@ -144,7 +172,7 @@ func runFlushTest[Option any](
for _, f := range failures { for _, f := range failures {
t.Run(f.Desc, func(t *testing.T) { t.Run(f.Desc, func(t *testing.T) {
errCountOpt, errCount := errCountOption() errCountOpt, errCount := errCountOption()
wc, bs, mb := newCache(t, createCacheFn, errCountOpt) wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
defer func() { require.NoError(t, wc.Close(context.Background())) }() defer func() { require.NoError(t, wc.Close(context.Background())) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
f.InjectFn(t, wc) f.InjectFn(t, wc)
@ -166,6 +194,7 @@ func runFlushTest[Option any](
func newCache[Option any]( func newCache[Option any](
t *testing.T, t *testing.T,
createCacheFn CreateCacheFunc[Option], createCacheFn CreateCacheFunc[Option],
smallSize uint64,
opts ...Option, opts ...Option,
) (Cache, *blobstor.BlobStor, *meta.DB) { ) (Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir() dir := t.TempDir()
@ -186,7 +215,7 @@ func newCache[Option any](
require.NoError(t, bs.Open(context.Background(), mode.ReadWrite)) require.NoError(t, bs.Open(context.Background(), mode.ReadWrite))
require.NoError(t, bs.Init(context.Background())) require.NoError(t, bs.Init(context.Background()))
wc := createCacheFn(t, mb, bs, opts...) wc := createCacheFn(t, smallSize, mb, bs, opts...)
require.NoError(t, wc.Open(context.Background(), mode.ReadWrite)) require.NoError(t, wc.Open(context.Background(), mode.ReadWrite))
require.NoError(t, wc.Init(context.Background())) require.NoError(t, wc.Init(context.Background()))
@ -234,7 +263,7 @@ func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPai
prm.StorageID = mRes.StorageID() prm.StorageID = mRes.StorageID()
res, err := bs.Get(context.Background(), prm) res, err := bs.Get(context.Background(), prm)
require.NoError(t, err, objects[i].addr) require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object) require.Equal(t, objects[i].obj, res.Object)
} }
} }

View file

@ -37,11 +37,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, ErrDegraded return nil, ErrDegraded
} }
obj, err := c.getInternal(ctx, addr) obj, err := c.getInternal(ctx, saddr, addr)
return obj, metaerr.Wrap(err) return obj, metaerr.Wrap(err)
} }
func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
found := false found := false
storageType := StorageTypeUndefined storageType := StorageTypeUndefined
startedAt := time.Now() startedAt := time.Now()
@ -49,6 +49,14 @@ func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.O
c.metrics.Get(time.Since(startedAt), found, storageType) c.metrics.Get(time.Since(startedAt), found, storageType)
}() }()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
found = true
storageType = StorageTypeDB
return obj, obj.Unmarshal(value)
}
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil { if err != nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
@ -79,7 +87,7 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
return nil, ErrDegraded return nil, ErrDegraded
} }
obj, err := c.getInternal(ctx, addr) obj, err := c.getInternal(ctx, saddr, addr)
if err != nil { if err != nil {
return nil, metaerr.Wrap(err) return nil, metaerr.Wrap(err)
} }

View file

@ -1,70 +0,0 @@
package writecache
import (
"errors"
"sync"
)
var errLimiterClosed = errors.New("acquire failed: limiter closed")
// flushLimiter is used to limit the total size of objects
// being flushed to blobstore at the same time. This is a necessary
// limitation so that the flushing process does not have
// a strong impact on user requests.
type flushLimiter struct {
count, size uint64
maxSize uint64
cond *sync.Cond
closed bool
}
func newFlushLimiter(maxSize uint64) *flushLimiter {
return &flushLimiter{
maxSize: maxSize,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (l *flushLimiter) acquire(size uint64) error {
l.cond.L.Lock()
defer l.cond.L.Unlock()
// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
for l.count > 0 && l.size+size > l.maxSize && !l.closed {
l.cond.Wait()
if l.closed {
return errLimiterClosed
}
}
l.count++
l.size += size
return nil
}
func (l *flushLimiter) release(size uint64) {
l.cond.L.Lock()
defer l.cond.L.Unlock()
if l.size >= size {
l.size -= size
} else {
panic("flushLimiter: invalid size")
}
if l.count > 0 {
l.count--
} else {
panic("flushLimiter: invalid count")
}
l.cond.Broadcast()
}
func (l *flushLimiter) close() {
l.cond.L.Lock()
defer l.cond.L.Unlock()
l.closed = true
l.cond.Broadcast()
}

View file

@ -1,27 +0,0 @@
package writecache
import (
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestLimiter(t *testing.T) {
var maxSize uint64 = 10
var single uint64 = 3
l := newFlushLimiter(uint64(maxSize))
var currSize atomic.Int64
var eg errgroup.Group
for range 10_000 {
eg.Go(func() error {
defer l.release(single)
defer currSize.Add(-1)
l.acquire(single)
require.True(t, currSize.Add(1) <= 3)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -26,9 +26,9 @@ type Metrics interface {
Flush(success bool, st StorageType) Flush(success bool, st StorageType)
Evict(st StorageType) Evict(st StorageType)
SetEstimateSize(uint64) SetEstimateSize(db, fstree uint64)
SetMode(m mode.ComponentMode) SetMode(m mode.ComponentMode)
SetActualCounters(uint64) SetActualCounters(db, fstree uint64)
SetPath(path string) SetPath(path string)
Close() Close()
} }
@ -47,11 +47,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {}
func (metricsStub) Put(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {}
func (metricsStub) SetEstimateSize(uint64) {} func (metricsStub) SetEstimateSize(uint64, uint64) {}
func (metricsStub) SetMode(mode.ComponentMode) {} func (metricsStub) SetMode(mode.ComponentMode) {}
func (metricsStub) SetActualCounters(uint64) {} func (metricsStub) SetActualCounters(uint64, uint64) {}
func (metricsStub) Flush(bool, StorageType) {} func (metricsStub) Flush(bool, StorageType) {}

View file

@ -5,12 +5,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -52,7 +53,7 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error
} }
} }
if err := c.closeStorage(ctx, prm.shrink); err != nil { if err := c.closeDB(ctx, prm.shrink); err != nil {
return err return err
} }
@ -77,37 +78,33 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error
return nil return nil
} }
func (c *cache) closeStorage(ctx context.Context, shrink bool) error { func (c *cache) closeDB(ctx context.Context, shrink bool) error {
if c.fsTree == nil { if c.db == nil {
return nil return nil
} }
if !shrink { if !shrink {
if err := c.fsTree.Close(ctx); err != nil { if err := c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache storage: %w", err) return fmt.Errorf("can't close write-cache database: %w", err)
} }
return nil return nil
} }
empty := true var empty bool
_, err := c.fsTree.Iterate(ctx, common.IteratePrm{ err := c.db.View(func(tx *bbolt.Tx) error {
Handler: func(common.IterationElement) error { b := tx.Bucket(defaultBucket)
return errIterationCompleted empty = b == nil || b.Stats().KeyN == 0
}, return nil
}) })
if err != nil { if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) {
if errors.Is(err, errIterationCompleted) { return fmt.Errorf("failed to check DB items: %w", err)
empty = false
} else {
return fmt.Errorf("failed to check write-cache items: %w", err)
} }
} if err := c.db.Close(); err != nil {
if err := c.fsTree.Close(ctx); err != nil { return fmt.Errorf("can't close write-cache database: %w", err)
return fmt.Errorf("can't close write-cache storage: %w", err)
} }
if empty { if empty {
err := os.RemoveAll(c.path) err := os.Remove(filepath.Join(c.path, dbName))
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove write-cache files: %w", err) return fmt.Errorf("failed to remove DB file: %w", err)
} }
} else { } else {
c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty) c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty)

View file

@ -17,14 +17,14 @@ func TestMode(t *testing.T) {
WithPath(t.TempDir())) WithPath(t.TempDir()))
require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly)) require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly))
require.Nil(t, wc.(*cache).fsTree) require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init(context.Background())) require.NoError(t, wc.Init(context.Background()))
require.Nil(t, wc.(*cache).fsTree) require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close(context.Background())) require.NoError(t, wc.Close(context.Background()))
require.NoError(t, wc.Open(context.Background(), mode.Degraded)) require.NoError(t, wc.Open(context.Background(), mode.Degraded))
require.Nil(t, wc.(*cache).fsTree) require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Init(context.Background())) require.NoError(t, wc.Init(context.Background()))
require.Nil(t, wc.(*cache).fsTree) require.Nil(t, wc.(*cache).db)
require.NoError(t, wc.Close(context.Background())) require.NoError(t, wc.Close(context.Background()))
} }

View file

@ -2,6 +2,9 @@ package writecache
import ( import (
"context" "context"
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
@ -20,6 +23,8 @@ type options struct {
metabase Metabase metabase Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache. // maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64 maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
smallObjectSize uint64
// workersCount is the number of workers flushing objects in parallel. // workersCount is the number of workers flushing objects in parallel.
workersCount int workersCount int
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
@ -28,16 +33,24 @@ type options struct {
// maxCacheCount is the maximum total count of all object saved in cache. // maxCacheCount is the maximum total count of all object saved in cache.
// 0 (no limit) by default. // 0 (no limit) by default.
maxCacheCount uint64 maxCacheCount uint64
// objCounters contains atomic counters for the number of objects stored in cache.
objCounters counters
// maxBatchSize is the maximum batch size for the small object database.
maxBatchSize int
// maxBatchDelay is the maximum batch wait time for the small object database.
maxBatchDelay time.Duration
// noSync is true iff FSTree allows unsynchronized writes. // noSync is true iff FSTree allows unsynchronized writes.
noSync bool noSync bool
// reportError is the function called when encountering disk errors in background workers. // reportError is the function called when encountering disk errors in background workers.
reportError func(context.Context, string, error) reportError func(context.Context, string, error)
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
openFile func(string, int, fs.FileMode) (*os.File, error)
// metrics is metrics implementation // metrics is metrics implementation
metrics Metrics metrics Metrics
// disableBackgroundFlush is for testing purposes only. // disableBackgroundFlush is for testing purposes only.
disableBackgroundFlush bool disableBackgroundFlush bool
// flushSizeLimit is total size of flushing objects. // pageSize is bbolt's page size config value
flushSizeLimit uint64 pageSize int
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -77,6 +90,15 @@ func WithMaxObjectSize(sz uint64) Option {
} }
} }
// WithSmallObjectSize sets maximum object size to be stored in write-cache.
func WithSmallObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.smallObjectSize = sz
}
}
}
func WithFlushWorkersCount(c int) Option { func WithFlushWorkersCount(c int) Option {
return func(o *options) { return func(o *options) {
if c > 0 { if c > 0 {
@ -99,6 +121,24 @@ func WithMaxCacheCount(v uint64) Option {
} }
} }
// WithMaxBatchSize sets max batch size for the small object database.
func WithMaxBatchSize(sz int) Option {
return func(o *options) {
if sz > 0 {
o.maxBatchSize = sz
}
}
}
// WithMaxBatchDelay sets max batch delay for the small object database.
func WithMaxBatchDelay(d time.Duration) Option {
return func(o *options) {
if d > 0 {
o.maxBatchDelay = d
}
}
}
// WithNoSync sets an option to allow returning to caller on PUT before write is persisted. // WithNoSync sets an option to allow returning to caller on PUT before write is persisted.
// Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because // Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because
// we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT // we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT
@ -116,6 +156,13 @@ func WithReportErrorFunc(f func(context.Context, string, error)) Option {
} }
} }
// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing.
func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
return func(o *options) {
o.openFile = f
}
}
// WithMetrics sets metrics implementation. // WithMetrics sets metrics implementation.
func WithMetrics(metrics Metrics) Option { func WithMetrics(metrics Metrics) Option {
return func(o *options) { return func(o *options) {
@ -130,9 +177,9 @@ func WithDisableBackgroundFlush() Option {
} }
} }
// WithFlushSizeLimit sets flush size limit. // WithPageSize sets bbolt's page size.
func WithFlushSizeLimit(v uint64) Option { func WithPageSize(s int) Option {
return func(o *options) { return func(o *options) {
o.flushSizeLimit = v o.pageSize = s
} }
} }

View file

@ -8,6 +8,7 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -49,16 +50,62 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
return common.PutRes{}, ErrBigObject return common.PutRes{}, ErrBigObject
} }
oi := objectInfo{
addr: prm.Address.EncodeToString(),
obj: prm.Object,
data: prm.RawData,
}
if sz <= c.smallObjectSize {
storageType = StorageTypeDB
err := c.putSmall(ctx, oi)
if err == nil {
added = true
}
return common.PutRes{}, err
}
storageType = StorageTypeFSTree storageType = StorageTypeFSTree
err := c.putBig(ctx, prm) err := c.putBig(ctx, oi.addr, prm)
if err == nil { if err == nil {
added = true added = true
} }
return common.PutRes{}, metaerr.Wrap(err) return common.PutRes{}, metaerr.Wrap(err)
} }
// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) putSmall(ctx context.Context, obj objectInfo) error {
if !c.hasEnoughSpaceDB() {
return ErrOutOfSpace
}
var newRecord bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(obj.addr)
newRecord = b.Get(key) == nil
if newRecord {
return b.Put(key, obj.data)
}
return nil
})
if err == nil {
storagelog.Write(ctx, c.log,
storagelog.AddressField(obj.addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
if newRecord {
c.objCounters.cDB.Add(1)
c.estimateCacheSize()
}
}
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue. // putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
if !c.hasEnoughSpaceFS() { if !c.hasEnoughSpaceFS() {
return ErrOutOfSpace return ErrOutOfSpace
} }
@ -69,7 +116,7 @@ func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
} }
storagelog.Write(ctx, c.log, storagelog.Write(ctx, c.log,
storagelog.AddressField(prm.Address.EncodeToString()), storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree PUT"), storagelog.OpField("fstree PUT"),
) )

View file

@ -1,10 +1,29 @@
package writecache package writecache
import (
"fmt"
"math"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt"
)
func (c *cache) estimateCacheSize() (uint64, uint64) { func (c *cache) estimateCacheSize() (uint64, uint64) {
count, size := c.counter.CountSize() dbCount := c.objCounters.DB()
c.metrics.SetEstimateSize(size) fsCount := c.objCounters.FS()
c.metrics.SetActualCounters(count) if fsCount > 0 {
return count, size fsCount-- // db file
}
dbSize := dbCount * c.smallObjectSize
fsSize := fsCount * c.maxObjectSize
c.metrics.SetEstimateSize(dbSize, fsSize)
c.metrics.SetActualCounters(dbCount, fsCount)
return dbCount + fsCount, dbSize + fsSize
}
func (c *cache) hasEnoughSpaceDB() bool {
return c.hasEnoughSpace(c.smallObjectSize)
} }
func (c *cache) hasEnoughSpaceFS() bool { func (c *cache) hasEnoughSpaceFS() bool {
@ -19,7 +38,48 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool {
return c.maxCacheSize >= size+objectSize return c.maxCacheSize >= size+objectSize
} }
var _ fstree.FileCounter = &counters{}
type counters struct {
cDB, cFS atomic.Uint64
}
func (x *counters) DB() uint64 {
return x.cDB.Load()
}
func (x *counters) FS() uint64 {
return x.cFS.Load()
}
// Set implements fstree.ObjectCounter.
func (x *counters) Set(v uint64) {
x.cFS.Store(v)
}
// Inc implements fstree.ObjectCounter.
func (x *counters) Inc() {
x.cFS.Add(1)
}
// Dec implements fstree.ObjectCounter.
func (x *counters) Dec() {
x.cFS.Add(math.MaxUint64)
}
func (c *cache) initCounters() error { func (c *cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b != nil {
inDB = uint64(b.Stats().KeyN)
}
return nil
})
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
}
c.objCounters.cDB.Store(inDB)
c.estimateCacheSize() c.estimateCacheSize()
return nil return nil
} }

View file

@ -3,6 +3,7 @@ package writecache
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"os" "os"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -13,22 +14,49 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
db *bbolt.DB
}
const dbName = "small.bolt"
func (c *cache) openStore(mod mode.ComponentMode) error { func (c *cache) openStore(mod mode.ComponentMode) error {
err := util.MkdirAllX(c.path, os.ModePerm) err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil { if err != nil {
return err return err
} }
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}
c.db.MaxBatchSize = c.maxBatchSize
c.db.MaxBatchDelay = c.maxBatchDelay
if !mod.ReadOnly() {
err = c.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
})
if err != nil {
return fmt.Errorf("could not create default bucket: %w", err)
}
}
c.fsTree = fstree.New( c.fsTree = fstree.New(
fstree.WithPath(c.path), fstree.WithPath(c.path),
fstree.WithPerm(os.ModePerm), fstree.WithPerm(os.ModePerm),
fstree.WithDepth(1), fstree.WithDepth(1),
fstree.WithDirNameLen(1), fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync), fstree.WithNoSync(c.noSync),
fstree.WithFileCounter(c.counter), fstree.WithFileCounter(&c.objCounters),
) )
if err := c.fsTree.Open(mod); err != nil { if err := c.fsTree.Open(mod); err != nil {
return fmt.Errorf("could not open FSTree: %w", err) return fmt.Errorf("could not open FSTree: %w", err)
@ -40,8 +68,43 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return nil return nil
} }
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address, size uint64) { func (c *cache) deleteFromDB(ctx context.Context, key string, batched bool) {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size}) var recordDeleted bool
var err error
if batched {
err = c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil {
c.metrics.Evict(StorageTypeDB)
storagelog.Write(ctx, c.log,
storagelog.AddressField(key),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
}
} else {
c.log.Error(ctx, logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
}
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !client.IsErrObjectNotFound(err) { if err != nil && !client.IsErrObjectNotFound(err) {
c.log.Error(ctx, logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) c.log.Error(ctx, logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
} else if err == nil { } else if err == nil {

View file

@ -1,110 +0,0 @@
package writecache
import (
"bytes"
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"time"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
const dbName = "small.bolt"
var defaultBucket = []byte{0}
func (c *cache) flushAndDropBBoltDB(ctx context.Context) error {
_, err := os.Stat(filepath.Join(c.path, dbName))
if err != nil && os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("could not check write-cache database existence: %w", err)
}
db, err := OpenDB(c.path, true, os.OpenFile)
if err != nil {
return fmt.Errorf("could not open write-cache database: %w", err)
}
defer func() {
_ = db.Close()
}()
var last string
for {
batch, err := c.readNextDBBatch(db, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
return fmt.Errorf("unmarshal object from database: %w", err)
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return fmt.Errorf("flush object from database: %w", err)
}
}
last = batch[len(batch)-1].address
}
if err := db.Close(); err != nil {
return fmt.Errorf("close write-cache database: %w", err)
}
if err := os.Remove(filepath.Join(c.path, dbName)); err != nil {
return fmt.Errorf("remove write-cache database: %w", err)
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
return fmt.Errorf("decode address from database: %w", err)
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
}
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -0,0 +1,21 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
"go.etcd.io/bbolt"
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
PageSize: pageSize,
})
}