Compare commits

...

10 commits

Author SHA1 Message Date
0a93738042 [#1266] .forgejo: Make 'fumpt' job fail on changed files
`gofumpt` always returns an exit code of 0, even when it finds
misformatted files. To make `fumpt` action behave as expected
we need to check if `gofumpt` changed any files.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-09-13 15:52:22 +03:00
546d09660f [#1283] Clear systemd-notify status on exit
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-09-12 14:45:06 +00:00
e3764c51df [#1347] metabase: Fix EC search
For EC chunks need to return EC parent object ID as
EC chunks don't have own attributes but inherit parent's.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 13:23:29 +00:00
b33559754d [#1367] fstree: Add size hint for Delete
This allow to not to call `os.Stat` if caller already knows data size.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
f345fe9a58 [#1367] writecache: Move DB related code to upgrade.go
This is done to drop this file in the future.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
3b236160a6 [#1367] writecache: Drop DB label from metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
25d2ae8aaf [#1367] writecache: Drop BBolt related config variables
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
e39378b1c3 [#1367] writecache: Add background flushing objects limiter
To limit memory usage by background flush.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
8a6e3025a0 [#1367] writecache: Flush from FSTree concurrently
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
2dd3a6f7a8 [#1367] fstree: Add IterateInfo method
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-12 15:06:33 +03:00
35 changed files with 577 additions and 307 deletions

View file

@ -106,4 +106,6 @@ jobs:
run: make fumpt-install run: make fumpt-install
- name: Run gofumpt - name: Run gofumpt
run: make fumpt run: |
make fumpt
git diff --exit-code --quiet

View file

@ -13,6 +13,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/misc" "git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -124,4 +125,8 @@ func shutdown() {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }
if err := sdnotify.ClearStatus(); err != nil {
log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
} }

View file

@ -25,7 +25,7 @@ func init() {
func inspectFunc(cmd *cobra.Command, _ []string) { func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte var data []byte
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0) db, err := writecache.OpenDB(vPath, true, os.OpenFile)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close() defer db.Close()

View file

@ -31,7 +31,7 @@ func listFunc(cmd *cobra.Command, _ []string) {
return err return err
} }
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0) db, err := writecache.OpenDB(vPath, true, os.OpenFile)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close() defer db.Close()

View file

@ -145,15 +145,12 @@ type shardCfg struct {
writecacheCfg struct { writecacheCfg struct {
enabled bool enabled bool
path string path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64 maxObjSize uint64
flushWorkerCount int flushWorkerCount int
sizeLimit uint64 sizeLimit uint64
countLimit uint64 countLimit uint64
noSync bool noSync bool
pageSize int flushSizeLimit uint64
} }
piloramaCfg struct { piloramaCfg struct {
@ -269,15 +266,12 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.enabled = true wc.enabled = true
wc.path = writeCacheCfg.Path() wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.pageSize = writeCacheCfg.BoltDB().PageSize()
wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit() wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit() wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync() wc.noSync = writeCacheCfg.NoSync()
wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
} }
} }
@ -862,11 +856,8 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
if wcRead := shCfg.writecacheCfg; wcRead.enabled { if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts, writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path), writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithFlushSizeLimit(wcRead.flushSizeLimit),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithPageSize(wcRead.pageSize),
writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithMaxCacheCount(wcRead.countLimit),
@ -1407,4 +1398,8 @@ func (c *cfg) shutdown() {
for i := range c.closers { for i := range c.closers {
c.closers[len(c.closers)-1-i].fn() c.closers[len(c.closers)-1-i].fn()
} }
if err := sdnotify.ClearStatus(); err != nil {
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
} }

View file

@ -73,12 +73,11 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, wc.NoSync()) require.Equal(t, true, wc.NoSync())
require.Equal(t, "tmp/0/cache", wc.Path()) require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 4096, wc.BoltDB().PageSize())
require.EqualValues(t, 49, wc.CountLimit()) require.EqualValues(t, 49, wc.CountLimit())
require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize())
require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -130,12 +129,11 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, wc.NoSync()) require.Equal(t, false, wc.NoSync())
require.Equal(t, "tmp/1/cache", wc.Path()) require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 0, wc.BoltDB().PageSize())
require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit()) require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit())
require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize())
require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -2,7 +2,6 @@ package writecacheconfig
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
boltdbconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/boltdb"
) )
// Config is a wrapper over the config section // Config is a wrapper over the config section
@ -10,9 +9,6 @@ import (
type Config config.Config type Config config.Config
const ( const (
// SmallSizeDefault is a default size of small objects.
SmallSizeDefault = 32 << 10
// MaxSizeDefault is a default value of the object payload size limit. // MaxSizeDefault is a default value of the object payload size limit.
MaxSizeDefault = 64 << 20 MaxSizeDefault = 64 << 20
@ -24,6 +20,8 @@ const (
// CountLimitDefault is a default write-cache count limit. // CountLimitDefault is a default write-cache count limit.
CountLimitDefault = 0 CountLimitDefault = 0
MaxFlushingObjectsSizeDefault = 128 << 20
) )
// From wraps config section into Config. // From wraps config section into Config.
@ -54,22 +52,6 @@ func (x *Config) Path() string {
return p return p
} }
// SmallObjectSize returns the value of "small_object_size" config parameter.
//
// Returns SmallSizeDefault if the value is not a positive number.
func (x *Config) SmallObjectSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"small_object_size",
)
if s > 0 {
return s
}
return SmallSizeDefault
}
// MaxObjectSize returns the value of "max_object_size" config parameter. // MaxObjectSize returns the value of "max_object_size" config parameter.
// //
// Returns MaxSizeDefault if the value is not a positive number. // Returns MaxSizeDefault if the value is not a positive number.
@ -141,7 +123,18 @@ func (x *Config) NoSync() bool {
return config.BoolSafe((*config.Config)(x), "no_sync") return config.BoolSafe((*config.Config)(x), "no_sync")
} }
// BoltDB returns config instance for querying bolt db specific parameters. // MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter.
func (x *Config) BoltDB() *boltdbconfig.Config { //
return (*boltdbconfig.Config)(x) // Returns MaxFlushingObjectsSizeDefault if the value is not a positive number.
func (x *Config) MaxFlushingObjectsSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"max_flushing_objects_size",
)
if s > 0 {
return s
}
return MaxFlushingObjectsSizeDefault
} }

View file

@ -106,6 +106,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096 FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100
### Metabase config ### Metabase config
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644 FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644

View file

@ -149,7 +149,8 @@
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 3221225472, "capacity": 3221225472,
"page_size": 4096, "page_size": 4096,
"max_object_count": 49 "max_object_count": 49,
"max_flushing_objects_size": 100
}, },
"metabase": { "metabase": {
"path": "tmp/0/meta", "path": "tmp/0/meta",

View file

@ -172,6 +172,7 @@ storage:
capacity: 3221225472 # approximate write-cache total size, bytes capacity: 3221225472 # approximate write-cache total size, bytes
max_object_count: 49 max_object_count: 49
page_size: 4k page_size: 4k
max_flushing_objects_size: 100b
metabase: metabase:
path: tmp/0/meta # metabase path path: tmp/0/meta # metabase path

View file

@ -287,23 +287,18 @@ writecache:
enabled: true enabled: true
path: /path/to/writecache path: /path/to/writecache
capacity: 4294967296 capacity: 4294967296
small_object_size: 16384
max_object_size: 134217728 max_object_size: 134217728
flush_worker_count: 30 flush_worker_count: 30
page_size: '4k'
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------| | --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- |
| `path` | `string` | | Path to the metabase file. | | `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | | `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | | `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | | `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | | `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | | `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. |
# `node` section # `node` section

View file

@ -544,4 +544,5 @@ const (
FailedToSealWritecacheAsync = "failed to seal writecache async" FailedToSealWritecacheAsync = "failed to seal writecache async"
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
WritecacheCantGetObject = "can't get an object from fstree"
) )

View file

@ -8,6 +8,7 @@ import (
type DeletePrm struct { type DeletePrm struct {
Address oid.Address Address oid.Address
StorageID []byte StorageID []byte
Size uint64
} }
// DeleteRes groups the resulting values of Delete operation. // DeleteRes groups the resulting values of Delete operation.

View file

@ -222,6 +222,81 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr
return nil return nil
} }
type ObjectInfo struct {
Address oid.Address
DataSize uint64
}
type IterateInfoHandler func(ObjectInfo) error
func (t *FSTree) IterateInfo(ctx context.Context, handler IterateInfoHandler) error {
var (
err error
startedAt = time.Now()
)
defer func() {
t.metrics.IterateInfo(time.Since(startedAt), err == nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "FSTree.IterateInfo")
defer span.End()
return t.iterateInfo(ctx, 0, []string{t.RootPath}, handler)
}
func (t *FSTree) iterateInfo(ctx context.Context, depth uint64, curPath []string, handler IterateInfoHandler) error {
curName := strings.Join(curPath[1:], "")
dirPath := filepath.Join(curPath...)
entries, err := os.ReadDir(dirPath)
if err != nil {
return fmt.Errorf("read fstree dir '%s': %w", dirPath, err)
}
isLast := depth >= t.Depth
l := len(curPath)
curPath = append(curPath, "")
for i := range entries {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
curPath[l] = entries[i].Name()
if !isLast && entries[i].IsDir() {
err := t.iterateInfo(ctx, depth+1, curPath, handler)
if err != nil {
return err
}
}
if depth != t.Depth {
continue
}
addr, err := addressFromString(curName + entries[i].Name())
if err != nil {
continue
}
info, err := entries[i].Info()
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}
err = handler(ObjectInfo{
Address: addr,
DataSize: uint64(info.Size()),
})
if err != nil {
return err
}
}
return nil
}
func (t *FSTree) treePath(addr oid.Address) string { func (t *FSTree) treePath(addr oid.Address) string {
sAddr := stringifyAddress(addr) sAddr := stringifyAddress(addr)
@ -263,7 +338,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
} }
p := t.treePath(prm.Address) p := t.treePath(prm.Address)
err = t.writer.removeFile(p) err = t.writer.removeFile(p, prm.Size)
return common.DeleteRes{}, err return common.DeleteRes{}, err
} }

View file

@ -68,6 +68,7 @@ func TestObjectCounter(t *testing.T) {
var delPrm common.DeletePrm var delPrm common.DeletePrm
delPrm.Address = addr delPrm.Address = addr
t.Run("without size hint", func(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error { eg.Go(func() error {
@ -98,4 +99,39 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size) require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
})
t.Run("with size hint", func(t *testing.T) {
delPrm.Size = uint64(len(putPrm.RawData))
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
for range 1_000 {
_, err := fst.Put(egCtx, putPrm)
if err != nil {
return err
}
}
return nil
})
eg.Go(func() error {
var le logicerr.Logical
for range 1_000 {
_, err := fst.Delete(egCtx, delPrm)
if err != nil && !errors.As(err, &le) {
return err
}
}
return nil
})
require.NoError(t, eg.Wait())
count, size = counter.CountSize()
realCount, realSize, err := fst.countFiles()
require.NoError(t, err)
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
})
} }

View file

@ -16,7 +16,7 @@ import (
type writer interface { type writer interface {
writeData(string, []byte) error writeData(string, []byte) error
removeFile(string) error removeFile(string, uint64) error
} }
type genericWriter struct { type genericWriter struct {
@ -107,10 +107,10 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
return err return err
} }
func (w *genericWriter) removeFile(p string) error { func (w *genericWriter) removeFile(p string, size uint64) error {
var err error var err error
if w.fileCounterEnabled { if w.fileCounterEnabled {
err = w.removeWithCounter(p) err = w.removeWithCounter(p, size)
} else { } else {
err = os.Remove(p) err = os.Remove(p)
} }
@ -121,18 +121,21 @@ func (w *genericWriter) removeFile(p string) error {
return err return err
} }
func (w *genericWriter) removeWithCounter(p string) error { func (w *genericWriter) removeWithCounter(p string, size uint64) error {
w.fileGuard.Lock(p) w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p) defer w.fileGuard.Unlock(p)
if size == 0 {
stat, err := os.Stat(p) stat, err := os.Stat(p)
if err != nil { if err != nil {
return err return err
} }
size = uint64(stat.Size())
}
if err := os.Remove(p); err != nil { if err := os.Remove(p); err != nil {
return err return err
} }
w.fileCounter.Dec(uint64(stat.Size())) w.fileCounter.Dec(uint64(size))
return nil return nil
} }

View file

@ -91,11 +91,12 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
return errClose return errClose
} }
func (w *linuxWriter) removeFile(p string) error { func (w *linuxWriter) removeFile(p string, size uint64) error {
if w.fileCounterEnabled { if w.fileCounterEnabled {
w.fileGuard.Lock(p) w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p) defer w.fileGuard.Unlock(p)
}
if size == 0 {
var stat unix.Stat_t var stat unix.Stat_t
err := unix.Stat(p, &stat) err := unix.Stat(p, &stat)
if err != nil { if err != nil {
@ -104,12 +105,16 @@ func (w *linuxWriter) removeFile(p string) error {
} }
return err return err
} }
err = unix.Unlink(p) size = uint64(stat.Size)
}
}
err := unix.Unlink(p)
if err != nil && err == unix.ENOENT { if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
if err == nil { if err == nil {
w.fileCounter.Dec(uint64(stat.Size)) w.fileCounter.Dec(uint64(size))
} }
return err return err
} }

View file

@ -13,6 +13,7 @@ type Metrics interface {
Close() Close()
Iterate(d time.Duration, success bool) Iterate(d time.Duration, success bool)
IterateInfo(d time.Duration, success bool)
Delete(d time.Duration, success bool) Delete(d time.Duration, success bool)
Exists(d time.Duration, success bool) Exists(d time.Duration, success bool)
Put(d time.Duration, size int, success bool) Put(d time.Duration, size int, success bool)
@ -27,6 +28,7 @@ func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(mode.ComponentMode) {} func (m *noopMetrics) SetMode(mode.ComponentMode) {}
func (m *noopMetrics) Close() {} func (m *noopMetrics) Close() {}
func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) IterateInfo(time.Duration, bool) {}
func (m *noopMetrics) Delete(time.Duration, bool) {} func (m *noopMetrics) Delete(time.Duration, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool) {} func (m *noopMetrics) Exists(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {}

View file

@ -169,18 +169,16 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto
m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d)
} }
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { func (m *writeCacheMetrics) SetEstimateSize(size uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db) m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), size)
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }
func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) { func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) {
m.metrics.SetMode(m.shardID, mod.String()) m.metrics.SetMode(m.shardID, mod.String())
} }
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { func (m *writeCacheMetrics) SetActualCounters(count uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db) m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), count)
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
} }
func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) { func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {

View file

@ -150,7 +150,8 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters
continue // ignore removed objects continue // ignore removed objects
} }
if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) { addr, match := db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch)
if !match {
continue // ignore objects with unmatched slow filters continue // ignore objects with unmatched slow filters
} }
@ -382,15 +383,16 @@ func (db *DB) selectObjectID(
} }
// matchSlowFilters return true if object header is matched by all slow filters. // matchSlowFilters return true if object header is matched by all slow filters.
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool { func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) (oid.Address, bool) {
result := addr
if len(f) == 0 { if len(f) == 0 {
return true return result, true
} }
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
obj, err := db.get(tx, addr, buf, true, false, currEpoch) obj, err := db.get(tx, addr, buf, true, false, currEpoch)
if err != nil { if err != nil {
return false return result, false
} }
for i := range f { for i := range f {
@ -415,23 +417,26 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
default: // user attribute default: // user attribute
v, ok := attributeValue(obj, f[i].Header()) v, ok := attributeValue(obj, f[i].Header())
if ok { if ok {
if ech := obj.ECHeader(); ech != nil {
result.SetObject(ech.Parent())
}
data = []byte(v) data = []byte(v)
} else { } else {
return f[i].Operation() == objectSDK.MatchNotPresent return result, f[i].Operation() == objectSDK.MatchNotPresent
} }
} }
matchFunc, ok := db.matchers[f[i].Operation()] matchFunc, ok := db.matchers[f[i].Operation()]
if !ok { if !ok {
return false return result, false
} }
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) { if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
return false return result, false
} }
} }
return true return result, true
} }
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) { func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {

View file

@ -70,6 +70,22 @@ func TestDB_SelectUserAttributes(t *testing.T) {
err = putBig(db, raw6) err = putBig(db, raw6)
require.NoError(t, err) require.NoError(t, err)
raw7 := testutil.GenerateObjectWithCID(cnr)
var attr objectSDK.Attribute
attr.SetKey("path")
attr.SetValue("test/3/4")
attrs := raw7.Attributes()
attrs = append(attrs, attr)
ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{
ID: oidtest.ID(),
Attributes: attrs,
}, 0, 3, []byte{}, 0)
raw7.SetECHeader(ech)
require.NoError(t, putBig(db, raw7))
var raw7Parent oid.Address
raw7Parent.SetContainer(cnr)
raw7Parent.SetObject(ech.Parent())
fs := objectSDK.SearchFilters{} fs := objectSDK.SearchFilters{}
fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual) fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs, testSelect(t, db, cnr, fs,
@ -100,6 +116,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
object.AddressOf(raw7),
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -110,6 +127,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
object.AddressOf(raw7),
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -120,6 +138,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
object.AddressOf(raw7),
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -131,6 +150,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
object.AddressOf(raw7),
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -139,6 +159,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
raw7Parent,
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -147,6 +168,12 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
) )
fs = objectSDK.SearchFilters{}
fs.AddFilter("path", "test/3/4", objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs,
raw7Parent,
)
} }
func TestDB_SelectRootPhyParent(t *testing.T) { func TestDB_SelectRootPhyParent(t *testing.T) {

View file

@ -38,6 +38,10 @@ func (m *fstreeMetrics) Iterate(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success) m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success)
} }
func (m *fstreeMetrics) IterateInfo(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "IterateInfo", d, success)
}
func (m *fstreeMetrics) Delete(d time.Duration, success bool) { func (m *fstreeMetrics) Delete(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Delete", d, success) m.m.MethodDuration(m.shardID, m.path, "Delete", d, success)
} }

View file

@ -118,6 +118,5 @@ func newCache(b *testing.B) writecache.Cache {
writecache.WithBlobstor(bs), writecache.WithBlobstor(bs),
writecache.WithMetabase(testMetabase{}), writecache.WithMetabase(testMetabase{}),
writecache.WithMaxCacheSize(256<<30), writecache.WithMaxCacheSize(256<<30),
writecache.WithSmallObjectSize(128<<10),
) )
} }

View file

@ -10,8 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -37,21 +36,16 @@ type cache struct {
const wcStorageType = "write-cache" const wcStorageType = "write-cache"
type objectInfo struct { type objectInfo struct {
addr string addr oid.Address
data []byte size uint64
obj *objectSDK.Object
} }
const ( const (
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
defaultSmallObjectSize = 32 * 1024 // 32 KiB
defaultMaxCacheSize = 1 << 30 // 1 GiB defaultMaxCacheSize = 1 << 30 // 1 GiB
) )
var ( var dummyCanceler context.CancelFunc = func() {}
defaultBucket = []byte{0}
dummyCanceler context.CancelFunc = func() {}
)
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) Cache { func New(opts ...Option) Cache {
@ -63,12 +57,10 @@ func New(opts ...Option) Cache {
options: options{ options: options{
log: &logger.Logger{Logger: zap.NewNop()}, log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize, maxObjectSize: defaultMaxObjectSize,
smallObjectSize: defaultSmallObjectSize,
workersCount: defaultFlushWorkersCount, workersCount: defaultFlushWorkersCount,
maxCacheSize: defaultMaxCacheSize, maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
metrics: DefaultMetrics(), metrics: DefaultMetrics(),
flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize,
}, },
} }

View file

@ -1,25 +1,21 @@
package writecache package writecache
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt"
"os"
"path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
@ -29,7 +25,7 @@ const (
// defaultFlushWorkersCount is number of workers for putting objects in main storage. // defaultFlushWorkersCount is number of workers for putting objects in main storage.
defaultFlushWorkersCount = 20 defaultFlushWorkersCount = 20
// defaultFlushInterval is default time interval between successive flushes. // defaultFlushInterval is default time interval between successive flushes.
defaultFlushInterval = time.Second defaultFlushInterval = 10 * time.Second
) )
var errIterationCompleted = errors.New("iteration completed") var errIterationCompleted = errors.New("iteration completed")
@ -39,25 +35,53 @@ func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush { if c.disableBackgroundFlush {
return return
} }
fl := newFlushLimiter(c.flushSizeLimit)
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
c.workerFlushBig(ctx) defer c.wg.Done()
c.wg.Done() c.pushToFlushQueue(ctx, fl)
}() }()
for range c.workersCount {
c.wg.Add(1)
go c.workerFlush(ctx, fl)
}
} }
func (c *cache) workerFlushBig(ctx context.Context) { func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
tick := time.NewTicker(defaultFlushInterval * 10) stopf := context.AfterFunc(ctx, func() {
fl.close()
})
defer stopf()
tick := time.NewTicker(defaultFlushInterval)
for { for {
select { select {
case <-tick.C: case <-tick.C:
c.modeMtx.RLock() c.modeMtx.RLock()
if c.readOnly() || c.noMetabase() { if c.readOnly() || c.noMetabase() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
break continue
} }
_ = c.flushFSTree(ctx, true) err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
if err := fl.acquire(oi.DataSize); err != nil {
return err
}
select {
case c.flushCh <- objectInfo{
addr: oi.Address,
size: oi.DataSize,
}:
return nil
case <-ctx.Done():
fl.release(oi.DataSize)
return ctx.Err()
}
})
if err != nil {
c.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err))
}
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
case <-ctx.Done(): case <-ctx.Done():
@ -66,6 +90,42 @@ func (c *cache) workerFlushBig(ctx context.Context) {
} }
} }
func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) {
defer c.wg.Done()
var objInfo objectInfo
for {
select {
case objInfo = <-c.flushCh:
c.flushIfAnObjectExistsWorker(ctx, objInfo, fl)
case <-ctx.Done():
return
}
}
}
func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
defer fl.release(objInfo.size)
res, err := c.fsTree.Get(ctx, common.GetPrm{
Address: objInfo.addr,
})
if err != nil {
if !client.IsErrObjectNotFound(err) {
c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err))
}
return
}
err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree)
if err != nil {
// Error is handled in flushObject.
return
}
c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData)))
}
func (c *cache) reportFlushError(msg string, addr string, err error) { func (c *cache) reportFlushError(msg string, addr string, err error) {
if c.reportError != nil { if c.reportError != nil {
c.reportError(msg, err) c.reportError(msg, err)
@ -97,7 +157,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
c.deleteFromDisk(ctx, e.Address) c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData)))
return nil return nil
} }
@ -173,83 +233,3 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return c.flushFSTree(ctx, ignoreErrors) return c.flushFSTree(ctx, ignoreErrors)
} }
type batchItem struct {
data []byte
address string
}
func (c *cache) flushAndDropBBoltDB(ctx context.Context) error {
_, err := os.Stat(filepath.Join(c.path, dbName))
if err != nil && os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("could not check write-cache database existence: %w", err)
}
db, err := OpenDB(c.path, true, os.OpenFile, c.pageSize)
if err != nil {
return fmt.Errorf("could not open write-cache database: %w", err)
}
defer func() {
_ = db.Close()
}()
var last string
for {
batch, err := c.readNextDBBatch(db, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
return fmt.Errorf("unmarshal object from database: %w", err)
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return fmt.Errorf("flush object from database: %w", err)
}
}
last = batch[len(batch)-1].address
}
if err := db.Close(); err != nil {
return fmt.Errorf("close write-cache database: %w", err)
}
if err := os.Remove(filepath.Join(c.path, dbName)); err != nil {
return fmt.Errorf("remove write-cache database: %w", err)
}
return nil
}
func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
return fmt.Errorf("decode address from database: %w", err)
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
}

View file

@ -25,12 +25,11 @@ import (
func TestFlush(t *testing.T) { func TestFlush(t *testing.T) {
testlogger := test.NewLogger(t) testlogger := test.NewLogger(t)
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs MainStorage, opts ...Option) Cache { createCacheFn := func(t *testing.T, mb *meta.DB, bs MainStorage, opts ...Option) Cache {
return New( return New(
append([]Option{ append([]Option{
WithLogger(testlogger), WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")), WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb), WithMetabase(mb),
WithBlobstor(bs), WithBlobstor(bs),
WithDisableBackgroundFlush(), WithDisableBackgroundFlush(),
@ -92,7 +91,6 @@ const (
type CreateCacheFunc[Option any] func( type CreateCacheFunc[Option any] func(
t *testing.T, t *testing.T,
smallSize uint64,
meta *meta.DB, meta *meta.DB,
bs MainStorage, bs MainStorage,
opts ...Option, opts ...Option,
@ -115,7 +113,7 @@ func runFlushTest[Option any](
failures ...TestFailureInjector[Option], failures ...TestFailureInjector[Option],
) { ) {
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn, smallSize) wc, bs, mb := newCache(t, createCacheFn)
defer func() { require.NoError(t, wc.Close()) }() defer func() { require.NoError(t, wc.Close()) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
@ -128,7 +126,7 @@ func runFlushTest[Option any](
}) })
t.Run("flush on moving to degraded mode", func(t *testing.T) { t.Run("flush on moving to degraded mode", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn, smallSize) wc, bs, mb := newCache(t, createCacheFn)
defer func() { require.NoError(t, wc.Close()) }() defer func() { require.NoError(t, wc.Close()) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
@ -146,7 +144,7 @@ func runFlushTest[Option any](
for _, f := range failures { for _, f := range failures {
t.Run(f.Desc, func(t *testing.T) { t.Run(f.Desc, func(t *testing.T) {
errCountOpt, errCount := errCountOption() errCountOpt, errCount := errCountOption()
wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt) wc, bs, mb := newCache(t, createCacheFn, errCountOpt)
defer func() { require.NoError(t, wc.Close()) }() defer func() { require.NoError(t, wc.Close()) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
f.InjectFn(t, wc) f.InjectFn(t, wc)
@ -168,7 +166,6 @@ func runFlushTest[Option any](
func newCache[Option any]( func newCache[Option any](
t *testing.T, t *testing.T,
createCacheFn CreateCacheFunc[Option], createCacheFn CreateCacheFunc[Option],
smallSize uint64,
opts ...Option, opts ...Option,
) (Cache, *blobstor.BlobStor, *meta.DB) { ) (Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir() dir := t.TempDir()
@ -189,7 +186,7 @@ func newCache[Option any](
require.NoError(t, bs.Open(context.Background(), mode.ReadWrite)) require.NoError(t, bs.Open(context.Background(), mode.ReadWrite))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
wc := createCacheFn(t, smallSize, mb, bs, opts...) wc := createCacheFn(t, mb, bs, opts...)
require.NoError(t, wc.Open(context.Background(), mode.ReadWrite)) require.NoError(t, wc.Open(context.Background(), mode.ReadWrite))
require.NoError(t, wc.Init()) require.NoError(t, wc.Init())

View file

@ -0,0 +1,70 @@
package writecache
import (
"errors"
"sync"
)
var errLimiterClosed = errors.New("acquire failed: limiter closed")
// flushLimiter is used to limit the total size of objects
// being flushed to blobstore at the same time. This is a necessary
// limitation so that the flushing process does not have
// a strong impact on user requests.
type flushLimiter struct {
count, size uint64
maxSize uint64
cond *sync.Cond
closed bool
}
func newFlushLimiter(maxSize uint64) *flushLimiter {
return &flushLimiter{
maxSize: maxSize,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (l *flushLimiter) acquire(size uint64) error {
l.cond.L.Lock()
defer l.cond.L.Unlock()
// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
for l.count > 0 && l.size+size > l.maxSize && !l.closed {
l.cond.Wait()
if l.closed {
return errLimiterClosed
}
}
l.count++
l.size += size
return nil
}
func (l *flushLimiter) release(size uint64) {
l.cond.L.Lock()
defer l.cond.L.Unlock()
if l.size >= size {
l.size -= size
} else {
panic("flushLimiter: invalid size")
}
if l.count > 0 {
l.count--
} else {
panic("flushLimiter: invalid count")
}
l.cond.Broadcast()
}
func (l *flushLimiter) close() {
l.cond.L.Lock()
defer l.cond.L.Unlock()
l.closed = true
l.cond.Broadcast()
}

View file

@ -0,0 +1,27 @@
package writecache
import (
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestLimiter(t *testing.T) {
var maxSize uint64 = 10
var single uint64 = 3
l := newFlushLimiter(uint64(maxSize))
var currSize atomic.Int64
var eg errgroup.Group
for i := 0; i < 10_000; i++ {
eg.Go(func() error {
defer l.release(single)
defer currSize.Add(-1)
l.acquire(single)
require.True(t, currSize.Add(1) <= 3)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -26,9 +26,9 @@ type Metrics interface {
Flush(success bool, st StorageType) Flush(success bool, st StorageType)
Evict(st StorageType) Evict(st StorageType)
SetEstimateSize(db, fstree uint64) SetEstimateSize(uint64)
SetMode(m mode.ComponentMode) SetMode(m mode.ComponentMode)
SetActualCounters(db, fstree uint64) SetActualCounters(uint64)
SetPath(path string) SetPath(path string)
Close() Close()
} }
@ -47,11 +47,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {}
func (metricsStub) Put(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {}
func (metricsStub) SetEstimateSize(uint64, uint64) {} func (metricsStub) SetEstimateSize(uint64) {}
func (metricsStub) SetMode(mode.ComponentMode) {} func (metricsStub) SetMode(mode.ComponentMode) {}
func (metricsStub) SetActualCounters(uint64, uint64) {} func (metricsStub) SetActualCounters(uint64) {}
func (metricsStub) Flush(bool, StorageType) {} func (metricsStub) Flush(bool, StorageType) {}

View file

@ -1,8 +1,6 @@
package writecache package writecache
import ( import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -20,8 +18,6 @@ type options struct {
metabase Metabase metabase Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache. // maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64 maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
smallObjectSize uint64
// workersCount is the number of workers flushing objects in parallel. // workersCount is the number of workers flushing objects in parallel.
workersCount int workersCount int
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
@ -30,10 +26,6 @@ type options struct {
// maxCacheCount is the maximum total count of all object saved in cache. // maxCacheCount is the maximum total count of all object saved in cache.
// 0 (no limit) by default. // 0 (no limit) by default.
maxCacheCount uint64 maxCacheCount uint64
// maxBatchSize is the maximum batch size for the small object database.
maxBatchSize int
// maxBatchDelay is the maximum batch wait time for the small object database.
maxBatchDelay time.Duration
// noSync is true iff FSTree allows unsynchronized writes. // noSync is true iff FSTree allows unsynchronized writes.
noSync bool noSync bool
// reportError is the function called when encountering disk errors in background workers. // reportError is the function called when encountering disk errors in background workers.
@ -42,8 +34,8 @@ type options struct {
metrics Metrics metrics Metrics
// disableBackgroundFlush is for testing purposes only. // disableBackgroundFlush is for testing purposes only.
disableBackgroundFlush bool disableBackgroundFlush bool
// pageSize is bbolt's page size config value // flushSizeLimit is total size of flushing objects.
pageSize int flushSizeLimit uint64
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -83,15 +75,6 @@ func WithMaxObjectSize(sz uint64) Option {
} }
} }
// WithSmallObjectSize sets maximum object size to be stored in write-cache.
func WithSmallObjectSize(sz uint64) Option {
return func(o *options) {
if sz > 0 {
o.smallObjectSize = sz
}
}
}
func WithFlushWorkersCount(c int) Option { func WithFlushWorkersCount(c int) Option {
return func(o *options) { return func(o *options) {
if c > 0 { if c > 0 {
@ -114,24 +97,6 @@ func WithMaxCacheCount(v uint64) Option {
} }
} }
// WithMaxBatchSize sets max batch size for the small object database.
func WithMaxBatchSize(sz int) Option {
return func(o *options) {
if sz > 0 {
o.maxBatchSize = sz
}
}
}
// WithMaxBatchDelay sets max batch delay for the small object database.
func WithMaxBatchDelay(d time.Duration) Option {
return func(o *options) {
if d > 0 {
o.maxBatchDelay = d
}
}
}
// WithNoSync sets an option to allow returning to caller on PUT before write is persisted. // WithNoSync sets an option to allow returning to caller on PUT before write is persisted.
// Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because // Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because
// we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT // we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT
@ -163,9 +128,9 @@ func WithDisableBackgroundFlush() Option {
} }
} }
// WithPageSize sets bbolt's page size. // WithFlushSizeLimit sets flush size limit.
func WithPageSize(s int) Option { func WithFlushSizeLimit(v uint64) Option {
return func(o *options) { return func(o *options) {
o.pageSize = s o.flushSizeLimit = v
} }
} }

View file

@ -2,8 +2,8 @@ package writecache
func (c *cache) estimateCacheSize() (uint64, uint64) { func (c *cache) estimateCacheSize() (uint64, uint64) {
count, size := c.counter.CountSize() count, size := c.counter.CountSize()
c.metrics.SetEstimateSize(0, size) c.metrics.SetEstimateSize(size)
c.metrics.SetActualCounters(0, count) c.metrics.SetActualCounters(count)
return count, size return count, size
} }

View file

@ -16,8 +16,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const dbName = "small.bolt"
func (c *cache) openStore(mod mode.ComponentMode) error { func (c *cache) openStore(mod mode.ComponentMode) error {
err := util.MkdirAllX(c.path, os.ModePerm) err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil { if err != nil {
@ -42,8 +40,8 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return nil return nil
} }
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address, size uint64) {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size})
if err != nil && !client.IsErrObjectNotFound(err) { if err != nil && !client.IsErrObjectNotFound(err) {
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
} else if err == nil { } else if err == nil {

View file

@ -0,0 +1,110 @@
package writecache
import (
"bytes"
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"time"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
const dbName = "small.bolt"
var defaultBucket = []byte{0}
func (c *cache) flushAndDropBBoltDB(ctx context.Context) error {
_, err := os.Stat(filepath.Join(c.path, dbName))
if err != nil && os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("could not check write-cache database existence: %w", err)
}
db, err := OpenDB(c.path, true, os.OpenFile)
if err != nil {
return fmt.Errorf("could not open write-cache database: %w", err)
}
defer func() {
_ = db.Close()
}()
var last string
for {
batch, err := c.readNextDBBatch(db, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
return fmt.Errorf("unmarshal object from database: %w", err)
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return fmt.Errorf("flush object from database: %w", err)
}
}
last = batch[len(batch)-1].address
}
if err := db.Close(); err != nil {
return fmt.Errorf("close write-cache database: %w", err)
}
if err := os.Remove(filepath.Join(c.path, dbName)); err != nil {
return fmt.Errorf("remove write-cache database: %w", err)
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
return fmt.Errorf("decode address from database: %w", err)
}
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
}
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -1,21 +0,0 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
"go.etcd.io/bbolt"
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
PageSize: pageSize,
})
}

View file

@ -69,6 +69,11 @@ func Status(status string) error {
return Send("STATUS=" + status) return Send("STATUS=" + status)
} }
// ClearStatus resets the current service status previously set by Status.
func ClearStatus() error {
return Status("")
}
// Send state through the notify socket if any. // Send state through the notify socket if any.
// If the notify socket was not detected, it returns an error. // If the notify socket was not detected, it returns an error.
func Send(state string) error { func Send(state string) error {