[#428] engine: Add low_mem config parameter

Concurrent initialization in case of the metabase resync leads to
high memory consumption and potential OOM.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-06-22 10:46:56 +03:00
parent 40eae22109
commit 2800bf9ce9
7 changed files with 44 additions and 10 deletions

View file

@ -98,6 +98,7 @@ type applicationConfiguration struct {
errorThreshold uint32
shardPoolSize uint32
shards []shardCfg
lowMem bool
}
}
@ -200,6 +201,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
@ -675,8 +677,8 @@ func (c *cfg) engineOpts() []engine.Option {
opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
)
if c.metricsCollector != nil {

View file

@ -83,3 +83,8 @@ func ShardPoolSize(c *config.Config) uint32 {
func ShardErrorThreshold(c *config.Config) uint32 {
return config.Uint32Safe(c.Sub(subsection), "shard_ro_error_threshold")
}
// EngineLowMemoryConsumption returns value of "lowmem" config parmeter from "storage" section.
func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem")
}

View file

@ -167,6 +167,7 @@ Local storage engine configuration.
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
## `shard` subsection

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type shardInitError struct {
@ -74,23 +75,30 @@ func (e *StorageEngine) Init(ctx context.Context) error {
e.mtx.Lock()
defer e.mtx.Unlock()
var wg sync.WaitGroup
var errCh = make(chan shardInitError, len(e.shards))
var eg errgroup.Group
if e.cfg.lowMem && e.anyShardRequiresRefill() {
eg.SetLimit(1)
}
for id, sh := range e.shards {
wg.Add(1)
go func(id string, sh *shard.Shard) {
defer wg.Done()
id := id
sh := sh
eg.Go(func() error {
if err := sh.Init(ctx); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
}(id, sh.Shard)
return nil
})
}
wg.Wait()
err := eg.Wait()
close(errCh)
if err != nil {
return fmt.Errorf("failed to initialize shards: %w", err)
}
for res := range errCh {
if res.err != nil {
@ -125,6 +133,15 @@ func (e *StorageEngine) Init(ctx context.Context) error {
return nil
}
func (e *StorageEngine) anyShardRequiresRefill() bool {
for _, sh := range e.shards {
if sh.NeedRefillMetabase() {
return true
}
}
return false
}
var errClosed = errors.New("storage engine is closed")
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.

View file

@ -210,6 +210,8 @@ type cfg struct {
metrics MetricRegister
shardPoolSize uint32
lowMem bool
}
func defaultCfg() *cfg {
@ -265,3 +267,10 @@ func WithErrorThreshold(sz uint32) Option {
c.errorsThreshold = sz
}
}
// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
func WithLowMemoryConsumption(lowMemCons bool) Option {
return func(c *cfg) {
c.lowMem = lowMemCons
}
}

View file

@ -98,7 +98,7 @@ func (s *Shard) Init(ctx context.Context) error {
if !s.GetMode().NoMetabase() {
var initMetabase initializer
if s.needRefillMetabase() {
if s.NeedRefillMetabase() {
initMetabase = (*metabaseSynchronizer)(s)
} else {
initMetabase = s.metaBase

View file

@ -231,8 +231,8 @@ func (s *Shard) hasWriteCache() bool {
return s.cfg.useWriteCache
}
// needRefillMetabase returns true if metabase is needed to be refilled.
func (s *Shard) needRefillMetabase() bool {
// NeedRefillMetabase returns true if metabase is needed to be refilled.
func (s *Shard) NeedRefillMetabase() bool {
return s.cfg.refillMetabase
}