Add storage.low_mem config parameter #461
8 changed files with 45 additions and 11 deletions
|
@ -98,6 +98,7 @@ type applicationConfiguration struct {
|
||||||
errorThreshold uint32
|
errorThreshold uint32
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
shards []shardCfg
|
shards []shardCfg
|
||||||
|
lowMem bool
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,6 +201,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
|
|
||||||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||||
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||||
|
|
||||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||||
}
|
}
|
||||||
|
@ -675,8 +677,8 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
||||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||||
|
|
||||||
engine.WithLogger(c.log),
|
engine.WithLogger(c.log),
|
||||||
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||||
)
|
)
|
||||||
|
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
|
|
|
@ -83,3 +83,8 @@ func ShardPoolSize(c *config.Config) uint32 {
|
||||||
func ShardErrorThreshold(c *config.Config) uint32 {
|
func ShardErrorThreshold(c *config.Config) uint32 {
|
||||||
return config.Uint32Safe(c.Sub(subsection), "shard_ro_error_threshold")
|
return config.Uint32Safe(c.Sub(subsection), "shard_ro_error_threshold")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EngineLowMemoryConsumption returns value of "lowmem" config parmeter from "storage" section.
|
||||||
|
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||||
|
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||||
|
}
|
||||||
|
|
|
@ -167,6 +167,7 @@ Local storage engine configuration.
|
||||||
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
||||||
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
||||||
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
||||||
|
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
||||||
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
||||||
|
|
||||||
## `shard` subsection
|
## `shard` subsection
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
type shardInitError struct {
|
type shardInitError struct {
|
||||||
|
@ -74,23 +75,30 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.Unlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var errCh = make(chan shardInitError, len(e.shards))
|
var errCh = make(chan shardInitError, len(e.shards))
|
||||||
|
var eg errgroup.Group
|
||||||
|
|||||||
|
if e.cfg.lowMem && e.anyShardRequiresRefill() {
|
||||||
|
eg.SetLimit(1)
|
||||||
|
}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why don't we just use Why don't we just use `errgroup`?
dstepanov-yadro
commented
fixed fixed
|
|||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
wg.Add(1)
|
id := id
|
||||||
go func(id string, sh *shard.Shard) {
|
sh := sh
|
||||||
defer wg.Done()
|
eg.Go(func() error {
|
||||||
if err := sh.Init(ctx); err != nil {
|
if err := sh.Init(ctx); err != nil {
|
||||||
errCh <- shardInitError{
|
errCh <- shardInitError{
|
||||||
err: err,
|
err: err,
|
||||||
id: id,
|
id: id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(id, sh.Shard)
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
wg.Wait()
|
err := eg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize shards: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for res := range errCh {
|
for res := range errCh {
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
|
@ -125,6 +133,15 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) anyShardRequiresRefill() bool {
|
||||||
|
for _, sh := range e.shards {
|
||||||
|
if sh.NeedRefillMetabase() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
var errClosed = errors.New("storage engine is closed")
|
var errClosed = errors.New("storage engine is closed")
|
||||||
|
|
||||||
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
|
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
|
||||||
|
|
|
@ -210,6 +210,8 @@ type cfg struct {
|
||||||
metrics MetricRegister
|
metrics MetricRegister
|
||||||
|
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
|
|
||||||
|
lowMem bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -265,3 +267,10 @@ func WithErrorThreshold(sz uint32) Option {
|
||||||
c.errorsThreshold = sz
|
c.errorsThreshold = sz
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
|
||||||
|
func WithLowMemoryConsumption(lowMemCons bool) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.lowMem = lowMemCons
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
_, err = te.ng.TreeAddByPath(context.Background(), d, treeID, pilorama.AttributeFilename, nil,
|
_, err = te.ng.TreeAddByPath(context.Background(), d, treeID, pilorama.AttributeFilename, nil,
|
||||||
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
|
[]pilorama.KeyValue{{Key: pilorama.AttributeFilename, Value: []byte(strconv.Itoa(i))}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
if !s.GetMode().NoMetabase() {
|
if !s.GetMode().NoMetabase() {
|
||||||
var initMetabase initializer
|
var initMetabase initializer
|
||||||
|
|
||||||
if s.needRefillMetabase() {
|
if s.NeedRefillMetabase() {
|
||||||
initMetabase = (*metabaseSynchronizer)(s)
|
initMetabase = (*metabaseSynchronizer)(s)
|
||||||
} else {
|
} else {
|
||||||
initMetabase = s.metaBase
|
initMetabase = s.metaBase
|
||||||
|
|
|
@ -231,8 +231,8 @@ func (s *Shard) hasWriteCache() bool {
|
||||||
return s.cfg.useWriteCache
|
return s.cfg.useWriteCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// needRefillMetabase returns true if metabase is needed to be refilled.
|
// NeedRefillMetabase returns true if metabase is needed to be refilled.
|
||||||
func (s *Shard) needRefillMetabase() bool {
|
func (s *Shard) NeedRefillMetabase() bool {
|
||||||
return s.cfg.refillMetabase
|
return s.cfg.refillMetabase
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
why ignore the derived context? shouldn't it be used by
sh.Init(ctx)
below?eg.Go
doesn't return error, so there is no need to use derived ctx. Derived ctx needed in case of some goroutine returns error, then derived ctx will be cancelled.well, but that was my point, to stop further initialization if one of them failed already.
ok, fixed
Well, actually, no, this is a feature -- if ONE shard cannot be initialized, we just remove it and continue initialization for others. Adding context doesn't change it actually, because we sent an error to the channel.
However I am a bit concerned now:
From https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext (cursive is mine):
And this context is passed to GC, have you checked it doesn't lead to problems?
May be indeed #461 (comment) was a mistake.
if that's the case then don't use
WithContext
in the first place? i.e. simplyvar eg errgroup.Group
. Otherwise it's kinda misleading.Thank you! Replace with var