From e39378b1c36d0d00864c4f5e7fcab44975ce506d Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Tue, 10 Sep 2024 12:56:29 +0300
Subject: [PATCH] [#1367] writecache: Add background flushing objects limiter

To limit memory usage by background flush.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 cmd/frostfs-node/config.go                    |  3 +
 cmd/frostfs-node/config/engine/config_test.go |  2 +
 .../config/engine/shard/writecache/config.go  | 18 +++++
 config/example/node.env                       |  1 +
 config/example/node.json                      |  3 +-
 config/example/node.yaml                      |  1 +
 docs/storage-node-configuration.md            | 23 +++---
 pkg/local_object_storage/writecache/cache.go  |  1 +
 pkg/local_object_storage/writecache/flush.go  | 61 ++++++++++------
 .../writecache/limiter.go                     | 70 +++++++++++++++++++
 .../writecache/limiter_test.go                | 27 +++++++
 .../writecache/options.go                     |  9 +++
 12 files changed, 184 insertions(+), 35 deletions(-)
 create mode 100644 pkg/local_object_storage/writecache/limiter.go
 create mode 100644 pkg/local_object_storage/writecache/limiter_test.go

diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go
index 16f49a08..ef275287 100644
--- a/cmd/frostfs-node/config.go
+++ b/cmd/frostfs-node/config.go
@@ -154,6 +154,7 @@ type shardCfg struct {
 		countLimit       uint64
 		noSync           bool
 		pageSize         int
+		flushSizeLimit   uint64
 	}
 
 	piloramaCfg struct {
@@ -278,6 +279,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
 		wc.sizeLimit = writeCacheCfg.SizeLimit()
 		wc.countLimit = writeCacheCfg.CountLimit()
 		wc.noSync = writeCacheCfg.NoSync()
+		wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
 	}
 }
 
@@ -865,6 +867,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
 			writecache.WithMaxBatchSize(wcRead.maxBatchSize),
 			writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
 			writecache.WithPageSize(wcRead.pageSize),
+			writecache.WithFlushSizeLimit(wcRead.flushSizeLimit),
 			writecache.WithMaxObjectSize(wcRead.maxObjSize),
 			writecache.WithSmallObjectSize(wcRead.smallObjectSize),
 			writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go
index ef6bf7f7..b952aca4 100644
--- a/cmd/frostfs-node/config/engine/config_test.go
+++ b/cmd/frostfs-node/config/engine/config_test.go
@@ -79,6 +79,7 @@ func TestEngineSection(t *testing.T) {
 				require.EqualValues(t, 3221225472, wc.SizeLimit())
 				require.EqualValues(t, 4096, wc.BoltDB().PageSize())
 				require.EqualValues(t, 49, wc.CountLimit())
+				require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize())
 
 				require.Equal(t, "tmp/0/meta", meta.Path())
 				require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@@ -136,6 +137,7 @@ func TestEngineSection(t *testing.T) {
 				require.EqualValues(t, 4294967296, wc.SizeLimit())
 				require.EqualValues(t, 0, wc.BoltDB().PageSize())
 				require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit())
+				require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize())
 
 				require.Equal(t, "tmp/1/meta", meta.Path())
 				require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go
index bfe8144d..5a069e99 100644
--- a/cmd/frostfs-node/config/engine/shard/writecache/config.go
+++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go
@@ -24,6 +24,8 @@ const (
 
 	// CountLimitDefault is a default write-cache count limit.
 	CountLimitDefault = 0
+
+	MaxFlushingObjectsSizeDefault = 128 << 20
 )
 
 // From wraps config section into Config.
@@ -145,3 +147,19 @@ func (x *Config) NoSync() bool {
 func (x *Config) BoltDB() *boltdbconfig.Config {
 	return (*boltdbconfig.Config)(x)
 }
+
+// MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter.
+//
+// Returns MaxFlushingObjectsSizeDefault if the value is not a positive number.
+func (x *Config) MaxFlushingObjectsSize() uint64 {
+	s := config.SizeInBytesSafe(
+		(*config.Config)(x),
+		"max_flushing_objects_size",
+	)
+
+	if s > 0 {
+		return s
+	}
+
+	return MaxFlushingObjectsSizeDefault
+}
diff --git a/config/example/node.env b/config/example/node.env
index 82553745..c3fa85c1 100644
--- a/config/example/node.env
+++ b/config/example/node.env
@@ -106,6 +106,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
 FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096
 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49
+FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100
 ### Metabase config
 FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
 FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644
diff --git a/config/example/node.json b/config/example/node.json
index da108c69..d7187250 100644
--- a/config/example/node.json
+++ b/config/example/node.json
@@ -149,7 +149,8 @@
           "flush_worker_count": 30,
           "capacity": 3221225472,
           "page_size": 4096,
-          "max_object_count": 49
+          "max_object_count": 49,
+          "max_flushing_objects_size": 100
         },
         "metabase": {
           "path": "tmp/0/meta",
diff --git a/config/example/node.yaml b/config/example/node.yaml
index a79f4822..776b22bd 100644
--- a/config/example/node.yaml
+++ b/config/example/node.yaml
@@ -172,6 +172,7 @@ storage:
         capacity: 3221225472  # approximate write-cache total size, bytes
         max_object_count: 49
         page_size: 4k
+        max_flushing_objects_size: 100b
 
       metabase:
         path: tmp/0/meta  # metabase path
diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md
index 5bf35cd6..c8382897 100644
--- a/docs/storage-node-configuration.md
+++ b/docs/storage-node-configuration.md
@@ -293,17 +293,18 @@ writecache:
   page_size: '4k'
 ```
 
-| Parameter            | Type       | Default value | Description                                                                                                                   |
-|----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|
-| `path`               | `string`   |               | Path to the metabase file.                                                                                                    |
-| `capacity`           | `size`     | `1G`          | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly.          |
-| `max_object_count`   | `int`      | unrestricted  | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. |
-| `small_object_size`  | `size`     | `32K`         | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system.            |
-| `max_object_size`    | `size`     | `64M`         | Maximum object size allowed to be stored in the writecache.                                                                   |
-| `flush_worker_count` | `int`      | `20`          | Amount of background workers that move data from the writecache to the blobstor.                                              |
-| `max_batch_size`     | `int`      | `1000`        | Maximum amount of small object `PUT` operations to perform in a single transaction.                                           |
-| `max_batch_delay`    | `duration` | `10ms`        | Maximum delay before a batch starts.                                                                                          |
-| `page_size`          | `size`     |  `0`          | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage.                 |
+| Parameter                   | Type       | Default value | Description                                                                                                                   |
+| --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- |
+| `path`                      | `string`   |               | Path to the metabase file.                                                                                                    |
+| `capacity`                  | `size`     | `1G`          | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly.          |
+| `max_object_count`          | `int`      | unrestricted  | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. |
+| `small_object_size`         | `size`     | `32K`         | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system.            |
+| `max_object_size`           | `size`     | `64M`         | Maximum object size allowed to be stored in the writecache.                                                                   |
+| `flush_worker_count`        | `int`      | `20`          | Amount of background workers that move data from the writecache to the blobstor.                                              |
+| `max_flushing_objects_size` | `size`     | `512M`        | Max total size of background flushing objects.                                                                                |
+| `max_batch_size`            | `int`      | `1000`        | Maximum amount of small object `PUT` operations to perform in a single transaction.                                           |
+| `max_batch_delay`           | `duration` | `10ms`        | Maximum delay before a batch starts.                                                                                          |
+| `page_size`                 | `size`     | `0`           | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage.                 |
 
 
 # `node` section
diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go
index b298f812..f0f10d8b 100644
--- a/pkg/local_object_storage/writecache/cache.go
+++ b/pkg/local_object_storage/writecache/cache.go
@@ -68,6 +68,7 @@ func New(opts ...Option) Cache {
 			maxBatchSize:    bbolt.DefaultMaxBatchSize,
 			maxBatchDelay:   bbolt.DefaultMaxBatchDelay,
 			metrics:         DefaultMetrics(),
+			flushSizeLimit:  defaultFlushWorkersCount * defaultMaxObjectSize,
 		},
 	}
 
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index d06896ed..5d5fc13a 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -18,7 +18,7 @@ import (
 	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
 	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
-	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
 	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
 	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
 	"go.etcd.io/bbolt"
@@ -41,19 +41,25 @@ func (c *cache) runFlushLoop(ctx context.Context) {
 	if c.disableBackgroundFlush {
 		return
 	}
+	fl := newFlushLimiter(c.flushSizeLimit)
 	c.wg.Add(1)
 	go func() {
 		defer c.wg.Done()
-		c.pushToFlushQueue(ctx)
+		c.pushToFlushQueue(ctx, fl)
 	}()
 
 	for range c.workersCount {
 		c.wg.Add(1)
-		go c.workerFlush(ctx)
+		go c.workerFlush(ctx, fl)
 	}
 }
 
-func (c *cache) pushToFlushQueue(ctx context.Context) {
+func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
+	stopf := context.AfterFunc(ctx, func() {
+		fl.close()
+	})
+	defer stopf()
+
 	tick := time.NewTicker(defaultFlushInterval)
 	for {
 		select {
@@ -65,6 +71,9 @@ func (c *cache) pushToFlushQueue(ctx context.Context) {
 			}
 
 			err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
+				if err := fl.acquire(oi.DataSize); err != nil {
+					return err
+				}
 				select {
 				case c.flushCh <- objectInfo{
 					addr: oi.Address,
@@ -72,6 +81,7 @@ func (c *cache) pushToFlushQueue(ctx context.Context) {
 				}:
 					return nil
 				case <-ctx.Done():
+					fl.release(oi.DataSize)
 					return ctx.Err()
 				}
 			})
@@ -86,37 +96,42 @@ func (c *cache) pushToFlushQueue(ctx context.Context) {
 	}
 }
 
-func (c *cache) workerFlush(ctx context.Context) {
+func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) {
 	defer c.wg.Done()
 
 	var objInfo objectInfo
 	for {
 		select {
 		case objInfo = <-c.flushCh:
+			c.flushIfAnObjectExistsWorker(ctx, objInfo, fl)
 		case <-ctx.Done():
 			return
 		}
-
-		res, err := c.fsTree.Get(ctx, common.GetPrm{
-			Address: objInfo.addr,
-		})
-		if err != nil {
-			if !errors.As(err, new(*apistatus.ObjectNotFound)) {
-				c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err))
-			}
-			continue
-		}
-
-		err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree)
-		if err != nil {
-			// Error is handled in flushObject.
-			continue
-		}
-
-		c.deleteFromDisk(ctx, objInfo.addr)
 	}
 }
 
+func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
+	defer fl.release(objInfo.size)
+
+	res, err := c.fsTree.Get(ctx, common.GetPrm{
+		Address: objInfo.addr,
+	})
+	if err != nil {
+		if !client.IsErrObjectNotFound(err) {
+			c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err))
+		}
+		return
+	}
+
+	err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree)
+	if err != nil {
+		// Error is handled in flushObject.
+		return
+	}
+
+	c.deleteFromDisk(ctx, objInfo.addr)
+}
+
 func (c *cache) reportFlushError(msg string, addr string, err error) {
 	if c.reportError != nil {
 		c.reportError(msg, err)
diff --git a/pkg/local_object_storage/writecache/limiter.go b/pkg/local_object_storage/writecache/limiter.go
new file mode 100644
index 00000000..ddc4101b
--- /dev/null
+++ b/pkg/local_object_storage/writecache/limiter.go
@@ -0,0 +1,70 @@
+package writecache
+
+import (
+	"errors"
+	"sync"
+)
+
+var errLimiterClosed = errors.New("acquire failed: limiter closed")
+
+// flushLimiter is used to limit the total size of objects
+// being flushed to blobstore at the same time. This is a necessary
+// limitation so that the flushing process does not have
+// a strong impact on user requests.
+type flushLimiter struct {
+	count, size uint64
+	maxSize     uint64
+	cond        *sync.Cond
+	closed      bool
+}
+
+func newFlushLimiter(maxSize uint64) *flushLimiter {
+	return &flushLimiter{
+		maxSize: maxSize,
+		cond:    sync.NewCond(&sync.Mutex{}),
+	}
+}
+
+func (l *flushLimiter) acquire(size uint64) error {
+	l.cond.L.Lock()
+	defer l.cond.L.Unlock()
+
+	// it is allowed to overflow maxSize to allow flushing objects with size > maxSize
+	for l.count > 0 && l.size+size > l.maxSize && !l.closed {
+		l.cond.Wait()
+		if l.closed {
+			return errLimiterClosed
+		}
+	}
+	l.count++
+	l.size += size
+	return nil
+}
+
+func (l *flushLimiter) release(size uint64) {
+	l.cond.L.Lock()
+	defer l.cond.L.Unlock()
+
+	if l.size >= size {
+		l.size -= size
+	} else {
+		panic("flushLimiter: invalid size")
+	}
+
+	if l.count > 0 {
+		l.count--
+	} else {
+		panic("flushLimiter: invalid count")
+	}
+
+	l.cond.Broadcast()
+}
+
+func (l *flushLimiter) close() {
+	l.cond.L.Lock()
+	defer l.cond.L.Unlock()
+
+	l.closed = true
+
+	l.cond.Broadcast()
+}
diff --git a/pkg/local_object_storage/writecache/limiter_test.go b/pkg/local_object_storage/writecache/limiter_test.go
new file mode 100644
index 00000000..db99b203
--- /dev/null
+++ b/pkg/local_object_storage/writecache/limiter_test.go
@@ -0,0 +1,27 @@
+package writecache
+
+import (
+	"sync/atomic"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+	"golang.org/x/sync/errgroup"
+)
+
+func TestLimiter(t *testing.T) {
+	var maxSize uint64 = 10
+	var single uint64 = 3
+	l := newFlushLimiter(uint64(maxSize))
+	var currSize atomic.Int64
+	var eg errgroup.Group
+	for i := 0; i < 10_000; i++ {
+		eg.Go(func() error {
+			defer l.release(single)
+			defer currSize.Add(-1)
+			l.acquire(single)
+			require.True(t, currSize.Add(1) <= 3)
+			return nil
+		})
+	}
+	require.NoError(t, eg.Wait())
+}
diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go
index 0643faac..edbb3d42 100644
--- a/pkg/local_object_storage/writecache/options.go
+++ b/pkg/local_object_storage/writecache/options.go
@@ -44,6 +44,8 @@ type options struct {
 	disableBackgroundFlush bool
 	// pageSize is bbolt's page size config value
 	pageSize int
+	// flushSizeLimit is total size of flushing objects.
+	flushSizeLimit uint64
 }
 
 // WithLogger sets logger.
@@ -169,3 +171,10 @@ func WithPageSize(s int) Option {
 		o.pageSize = s
 	}
 }
+
+// WithFlushSizeLimit sets flush size limit.
+func WithFlushSizeLimit(v uint64) Option {
+	return func(o *options) {
+		o.flushSizeLimit = v
+	}
+}