diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 16f49a082..ef2752872 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -154,6 +154,7 @@ type shardCfg struct { countLimit uint64 noSync bool pageSize int + flushSizeLimit uint64 } piloramaCfg struct { @@ -278,6 +279,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.sizeLimit = writeCacheCfg.SizeLimit() wc.countLimit = writeCacheCfg.CountLimit() wc.noSync = writeCacheCfg.NoSync() + wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize() } } @@ -865,6 +867,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithPageSize(wcRead.pageSize), + writecache.WithFlushSizeLimit(wcRead.flushSizeLimit), writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index ef6bf7f74..b952aca4c 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -79,6 +79,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 4096, wc.BoltDB().PageSize()) require.EqualValues(t, 49, wc.CountLimit()) + require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize()) require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) @@ -136,6 +137,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 0, wc.BoltDB().PageSize()) require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit()) + require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize()) require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index bfe8144df..5a069e99f 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -24,6 +24,8 @@ const ( // CountLimitDefault is a default write-cache count limit. CountLimitDefault = 0 + + MaxFlushingObjectsSizeDefault = 128 << 20 ) // From wraps config section into Config. @@ -145,3 +147,19 @@ func (x *Config) NoSync() bool { func (x *Config) BoltDB() *boltdbconfig.Config { return (*boltdbconfig.Config)(x) } + +// MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter. +// +// Returns MaxFlushingObjectsSizeDefault if the value is not a positive number. +func (x *Config) MaxFlushingObjectsSize() uint64 { + s := config.SizeInBytesSafe( + (*config.Config)(x), + "max_flushing_objects_size", + ) + + if s > 0 { + return s + } + + return MaxFlushingObjectsSizeDefault +} diff --git a/config/example/node.env b/config/example/node.env index 82553745e..c3fa85c13 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -106,6 +106,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49 +FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100 ### Metabase config FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644 diff --git a/config/example/node.json b/config/example/node.json index da108c692..d7187250b 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -149,7 +149,8 @@ "flush_worker_count": 30, "capacity": 3221225472, "page_size": 4096, - "max_object_count": 49 + "max_object_count": 49, + "max_flushing_objects_size": 100 }, "metabase": { "path": "tmp/0/meta", diff --git a/config/example/node.yaml b/config/example/node.yaml index a79f48226..776b22bd0 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -172,6 +172,7 @@ storage: capacity: 3221225472 # approximate write-cache total size, bytes max_object_count: 49 page_size: 4k + max_flushing_objects_size: 100b metabase: path: tmp/0/meta # metabase path diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 5bf35cd65..c83828978 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -293,17 +293,18 @@ writecache: page_size: '4k' ``` -| Parameter | Type | Default value | Description | -|----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------| -| `path` | `string` | | Path to the metabase file. | -| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | -| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | -| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | -| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | -| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | -| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. | +| Parameter | Type | Default value | Description | +| --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- | +| `path` | `string` | | Path to the metabase file. | +| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | +| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | +| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | +| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. | +| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | +| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | +| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. | # `node` section diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index b298f812a..f0f10d8b5 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -68,6 +68,7 @@ func New(opts ...Option) Cache { maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, metrics: DefaultMetrics(), + flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize, }, } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index d06896ed5..5d5fc13ab 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -18,7 +18,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -41,19 +41,25 @@ func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } + fl := newFlushLimiter(c.flushSizeLimit) c.wg.Add(1) go func() { defer c.wg.Done() - c.pushToFlushQueue(ctx) + c.pushToFlushQueue(ctx, fl) }() for range c.workersCount { c.wg.Add(1) - go c.workerFlush(ctx) + go c.workerFlush(ctx, fl) } } -func (c *cache) pushToFlushQueue(ctx context.Context) { +func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { + stopf := context.AfterFunc(ctx, func() { + fl.close() + }) + defer stopf() + tick := time.NewTicker(defaultFlushInterval) for { select { @@ -65,6 +71,9 @@ func (c *cache) pushToFlushQueue(ctx context.Context) { } err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { + if err := fl.acquire(oi.DataSize); err != nil { + return err + } select { case c.flushCh <- objectInfo{ addr: oi.Address, @@ -72,6 +81,7 @@ func (c *cache) pushToFlushQueue(ctx context.Context) { }: return nil case <-ctx.Done(): + fl.release(oi.DataSize) return ctx.Err() } }) @@ -86,37 +96,42 @@ func (c *cache) pushToFlushQueue(ctx context.Context) { } } -func (c *cache) workerFlush(ctx context.Context) { +func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) { defer c.wg.Done() var objInfo objectInfo for { select { case objInfo = <-c.flushCh: + c.flushIfAnObjectExistsWorker(ctx, objInfo, fl) case <-ctx.Done(): return } - - res, err := c.fsTree.Get(ctx, common.GetPrm{ - Address: objInfo.addr, - }) - if err != nil { - if !errors.As(err, new(*apistatus.ObjectNotFound)) { - c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) - } - continue - } - - err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) - if err != nil { - // Error is handled in flushObject. - continue - } - - c.deleteFromDisk(ctx, objInfo.addr) } } +func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) { + defer fl.release(objInfo.size) + + res, err := c.fsTree.Get(ctx, common.GetPrm{ + Address: objInfo.addr, + }) + if err != nil { + if !client.IsErrObjectNotFound(err) { + c.reportFlushError(logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) + } + return + } + + err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) + if err != nil { + // Error is handled in flushObject. + return + } + + c.deleteFromDisk(ctx, objInfo.addr) +} + func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) diff --git a/pkg/local_object_storage/writecache/limiter.go b/pkg/local_object_storage/writecache/limiter.go new file mode 100644 index 000000000..ddc4101be --- /dev/null +++ b/pkg/local_object_storage/writecache/limiter.go @@ -0,0 +1,70 @@ +package writecache + +import ( + "errors" + "sync" +) + +var errLimiterClosed = errors.New("acquire failed: limiter closed") + +// flushLimiter is used to limit the total size of objects +// being flushed to blobstore at the same time. This is a necessary +// limitation so that the flushing process does not have +// a strong impact on user requests. +type flushLimiter struct { + count, size uint64 + maxSize uint64 + cond *sync.Cond + closed bool +} + +func newFlushLimiter(maxSize uint64) *flushLimiter { + return &flushLimiter{ + maxSize: maxSize, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (l *flushLimiter) acquire(size uint64) error { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + // it is allowed to overflow maxSize to allow flushing objects with size > maxSize + for l.count > 0 && l.size+size > l.maxSize && !l.closed { + l.cond.Wait() + if l.closed { + return errLimiterClosed + } + } + l.count++ + l.size += size + return nil +} + +func (l *flushLimiter) release(size uint64) { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + if l.size >= size { + l.size -= size + } else { + panic("flushLimiter: invalid size") + } + + if l.count > 0 { + l.count-- + } else { + panic("flushLimiter: invalid count") + } + + l.cond.Broadcast() +} + +func (l *flushLimiter) close() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + l.closed = true + + l.cond.Broadcast() +} diff --git a/pkg/local_object_storage/writecache/limiter_test.go b/pkg/local_object_storage/writecache/limiter_test.go new file mode 100644 index 000000000..db99b203a --- /dev/null +++ b/pkg/local_object_storage/writecache/limiter_test.go @@ -0,0 +1,27 @@ +package writecache + +import ( + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestLimiter(t *testing.T) { + var maxSize uint64 = 10 + var single uint64 = 3 + l := newFlushLimiter(uint64(maxSize)) + var currSize atomic.Int64 + var eg errgroup.Group + for i := 0; i < 10_000; i++ { + eg.Go(func() error { + defer l.release(single) + defer currSize.Add(-1) + l.acquire(single) + require.True(t, currSize.Add(1) <= 3) + return nil + }) + } + require.NoError(t, eg.Wait()) +} diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 0643faac0..edbb3d422 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -44,6 +44,8 @@ type options struct { disableBackgroundFlush bool // pageSize is bbolt's page size config value pageSize int + // flushSizeLimit is total size of flushing objects. + flushSizeLimit uint64 } // WithLogger sets logger. @@ -169,3 +171,10 @@ func WithPageSize(s int) Option { o.pageSize = s } } + +// WithFlushSizeLimit sets flush size limit. +func WithFlushSizeLimit(v uint64) Option { + return func(o *options) { + o.flushSizeLimit = v + } +}