From 2736176c00b336cbb5a7f6e9a6570dcbc77536d2 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 10 Sep 2024 12:56:29 +0300 Subject: [PATCH] [#9999] writecache: Add background flushing objects limiter To limit memory usage by background flush. Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 3 + cmd/frostfs-node/config/engine/config_test.go | 2 + .../config/engine/shard/writecache/config.go | 18 ++++++ config/example/node.env | 1 + config/example/node.json | 3 +- config/example/node.yaml | 1 + docs/storage-node-configuration.md | 23 +++---- pkg/local_object_storage/writecache/cache.go | 4 ++ pkg/local_object_storage/writecache/flush.go | 44 ++++++++------ .../writecache/limiter.go | 60 +++++++++++++++++++ .../writecache/options.go | 9 +++ 11 files changed, 138 insertions(+), 30 deletions(-) create mode 100644 pkg/local_object_storage/writecache/limiter.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 16f49a082..ef2752872 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -154,6 +154,7 @@ type shardCfg struct { countLimit uint64 noSync bool pageSize int + flushSizeLimit uint64 } piloramaCfg struct { @@ -278,6 +279,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.sizeLimit = writeCacheCfg.SizeLimit() wc.countLimit = writeCacheCfg.CountLimit() wc.noSync = writeCacheCfg.NoSync() + wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize() } } @@ -865,6 +867,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithPageSize(wcRead.pageSize), + writecache.WithFlushSizeLimit(wcRead.flushSizeLimit), writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index ef6bf7f74..b952aca4c 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -79,6 +79,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 4096, wc.BoltDB().PageSize()) require.EqualValues(t, 49, wc.CountLimit()) + require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize()) require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) @@ -136,6 +137,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 0, wc.BoltDB().PageSize()) require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit()) + require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize()) require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index bfe8144df..5a069e99f 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -24,6 +24,8 @@ const ( // CountLimitDefault is a default write-cache count limit. CountLimitDefault = 0 + + MaxFlushingObjectsSizeDefault = 128 << 20 ) // From wraps config section into Config. @@ -145,3 +147,19 @@ func (x *Config) NoSync() bool { func (x *Config) BoltDB() *boltdbconfig.Config { return (*boltdbconfig.Config)(x) } + +// MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter. +// +// Returns MaxFlushingObjectsSizeDefault if the value is not a positive number. +func (x *Config) MaxFlushingObjectsSize() uint64 { + s := config.SizeInBytesSafe( + (*config.Config)(x), + "max_flushing_objects_size", + ) + + if s > 0 { + return s + } + + return MaxFlushingObjectsSizeDefault +} diff --git a/config/example/node.env b/config/example/node.env index 82553745e..c3fa85c13 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -106,6 +106,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49 +FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100 ### Metabase config FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644 diff --git a/config/example/node.json b/config/example/node.json index da108c692..d7187250b 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -149,7 +149,8 @@ "flush_worker_count": 30, "capacity": 3221225472, "page_size": 4096, - "max_object_count": 49 + "max_object_count": 49, + "max_flushing_objects_size": 100 }, "metabase": { "path": "tmp/0/meta", diff --git a/config/example/node.yaml b/config/example/node.yaml index a79f48226..776b22bd0 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -172,6 +172,7 @@ storage: capacity: 3221225472 # approximate write-cache total size, bytes max_object_count: 49 page_size: 4k + max_flushing_objects_size: 100b metabase: path: tmp/0/meta # metabase path diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 5bf35cd65..c83828978 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -293,17 +293,18 @@ writecache: page_size: '4k' ``` -| Parameter | Type | Default value | Description | -|----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------| -| `path` | `string` | | Path to the metabase file. | -| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | -| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | -| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | -| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | -| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | -| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. | +| Parameter | Type | Default value | Description | +| --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- | +| `path` | `string` | | Path to the metabase file. | +| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | +| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | +| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | +| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. | +| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | +| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | +| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. | # `node` section diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index 29bcd4725..e2bc6e1d4 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -23,6 +23,8 @@ type cache struct { // flushCh is a channel with objects to flush. flushCh chan objectInfo + // flushLimiter limits memory consumption by background flush. + flushLimiter *flushLimiter // cancel is cancel function, protected by modeMtx in Close. cancel atomic.Value // wg is a wait group for flush workers. @@ -68,12 +70,14 @@ func New(opts ...Option) Cache { maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, metrics: DefaultMetrics(), + flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize, }, } for i := range opts { opts[i](&c.options) } + c.flushLimiter = newFlushLimiter(c.flushSizeLimit) return c } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index a06e6c2da..9feb3ae22 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -69,6 +69,9 @@ func (c *cache) pushToFlushQueue(ctx context.Context) { } err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { + if err := c.flushLimiter.acquire(ctx, oi.DataSize); err != nil { + return err + } select { case c.flushCh <- objectInfo{ addr: oi.Address, @@ -97,30 +100,35 @@ func (c *cache) workerFlush(ctx context.Context) { for { select { case objInfo = <-c.flushCh: + c.flushFromWorker(ctx, objInfo) case <-ctx.Done(): return } - - res, err := c.fsTree.Get(ctx, common.GetPrm{ - Address: objInfo.addr, - }) - if err != nil { - if !errors.As(err, new(*apistatus.ObjectNotFound)) { - c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) - } - continue - } - - err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) - if err != nil { - // Error is handled in flushObject. - continue - } - - c.deleteFromDisk(ctx, objInfo.addr) } } +func (c *cache) flushFromWorker(ctx context.Context, objInfo objectInfo) { + defer c.flushLimiter.release(objInfo.size) + + res, err := c.fsTree.Get(ctx, common.GetPrm{ + Address: objInfo.addr, + }) + if err != nil { + if !errors.As(err, new(*apistatus.ObjectNotFound)) { + c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) + } + return + } + + err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) + if err != nil { + // Error is handled in flushObject. + return + } + + c.deleteFromDisk(ctx, objInfo.addr) +} + func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) diff --git a/pkg/local_object_storage/writecache/limiter.go b/pkg/local_object_storage/writecache/limiter.go new file mode 100644 index 000000000..cb44111f7 --- /dev/null +++ b/pkg/local_object_storage/writecache/limiter.go @@ -0,0 +1,60 @@ +package writecache + +import ( + "context" + "sync" +) + +type flushLimiter struct { + count, size uint64 + maxSize uint64 + cond *sync.Cond +} + +func newFlushLimiter(maxSize uint64) *flushLimiter { + return &flushLimiter{ + maxSize: maxSize, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (l *flushLimiter) acquire(ctx context.Context, size uint64) error { + stopf := context.AfterFunc(ctx, func() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + l.cond.Broadcast() + }) + defer stopf() + + l.cond.L.Lock() + defer l.cond.L.Unlock() + + // it is allowed to overflow maxSize to allow flushing objects with size > maxSize + for l.count > 0 && l.size+size > l.maxSize { + l.cond.Wait() + if ctx.Err() != nil { + return ctx.Err() + } + } + l.count++ + l.size += size + return nil +} + +func (l *flushLimiter) release(size uint64) { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + if l.size >= size { + l.size -= size + } else { + l.size = 0 + } + + if l.count > 0 { + l.count-- + } + + l.cond.Broadcast() +} diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 0643faac0..edbb3d422 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -44,6 +44,8 @@ type options struct { disableBackgroundFlush bool // pageSize is bbolt's page size config value pageSize int + // flushSizeLimit is total size of flushing objects. + flushSizeLimit uint64 } // WithLogger sets logger. @@ -169,3 +171,10 @@ func WithPageSize(s int) Option { o.pageSize = s } } + +// WithFlushSizeLimit sets flush size limit. +func WithFlushSizeLimit(v uint64) Option { + return func(o *options) { + o.flushSizeLimit = v + } +}