forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
7 changed files with 44 additions and 10 deletions
|
@ -98,6 +98,7 @@ type applicationConfiguration struct {
|
||||||
errorThreshold uint32
|
errorThreshold uint32
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
shards []shardCfg
|
shards []shardCfg
|
||||||
|
lowMem bool
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,6 +201,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
|
|
||||||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||||
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||||
|
|
||||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||||
}
|
}
|
||||||
|
@ -675,8 +677,8 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
||||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||||
|
|
||||||
engine.WithLogger(c.log),
|
engine.WithLogger(c.log),
|
||||||
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||||
)
|
)
|
||||||
|
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
|
|
|
@ -83,3 +83,8 @@ func ShardPoolSize(c *config.Config) uint32 {
|
||||||
func ShardErrorThreshold(c *config.Config) uint32 {
|
func ShardErrorThreshold(c *config.Config) uint32 {
|
||||||
return config.Uint32Safe(c.Sub(subsection), "shard_ro_error_threshold")
|
return config.Uint32Safe(c.Sub(subsection), "shard_ro_error_threshold")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EngineLowMemoryConsumption returns value of "lowmem" config parmeter from "storage" section.
|
||||||
|
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||||
|
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||||
|
}
|
||||||
|
|
|
@ -167,6 +167,7 @@ Local storage engine configuration.
|
||||||
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
||||||
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
||||||
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
||||||
|
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
||||||
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
||||||
|
|
||||||
## `shard` subsection
|
## `shard` subsection
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
type shardInitError struct {
|
type shardInitError struct {
|
||||||
|
@ -74,23 +75,30 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.Unlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var errCh = make(chan shardInitError, len(e.shards))
|
var errCh = make(chan shardInitError, len(e.shards))
|
||||||
|
var eg errgroup.Group
|
||||||
|
if e.cfg.lowMem && e.anyShardRequiresRefill() {
|
||||||
|
eg.SetLimit(1)
|
||||||
|
}
|
||||||
|
|
||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
wg.Add(1)
|
id := id
|
||||||
go func(id string, sh *shard.Shard) {
|
sh := sh
|
||||||
defer wg.Done()
|
eg.Go(func() error {
|
||||||
if err := sh.Init(ctx); err != nil {
|
if err := sh.Init(ctx); err != nil {
|
||||||
errCh <- shardInitError{
|
errCh <- shardInitError{
|
||||||
err: err,
|
err: err,
|
||||||
id: id,
|
id: id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(id, sh.Shard)
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
wg.Wait()
|
err := eg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize shards: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for res := range errCh {
|
for res := range errCh {
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
|
@ -125,6 +133,15 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) anyShardRequiresRefill() bool {
|
||||||
|
for _, sh := range e.shards {
|
||||||
|
if sh.NeedRefillMetabase() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
var errClosed = errors.New("storage engine is closed")
|
var errClosed = errors.New("storage engine is closed")
|
||||||
|
|
||||||
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
|
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
|
||||||
|
|
|
@ -210,6 +210,8 @@ type cfg struct {
|
||||||
metrics MetricRegister
|
metrics MetricRegister
|
||||||
|
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
|
|
||||||
|
lowMem bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -265,3 +267,10 @@ func WithErrorThreshold(sz uint32) Option {
|
||||||
c.errorsThreshold = sz
|
c.errorsThreshold = sz
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
|
||||||
|
func WithLowMemoryConsumption(lowMemCons bool) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.lowMem = lowMemCons
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
if !s.GetMode().NoMetabase() {
|
if !s.GetMode().NoMetabase() {
|
||||||
var initMetabase initializer
|
var initMetabase initializer
|
||||||
|
|
||||||
if s.needRefillMetabase() {
|
if s.NeedRefillMetabase() {
|
||||||
initMetabase = (*metabaseSynchronizer)(s)
|
initMetabase = (*metabaseSynchronizer)(s)
|
||||||
} else {
|
} else {
|
||||||
initMetabase = s.metaBase
|
initMetabase = s.metaBase
|
||||||
|
|
|
@ -231,8 +231,8 @@ func (s *Shard) hasWriteCache() bool {
|
||||||
return s.cfg.useWriteCache
|
return s.cfg.useWriteCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// needRefillMetabase returns true if metabase is needed to be refilled.
|
// NeedRefillMetabase returns true if metabase is needed to be refilled.
|
||||||
func (s *Shard) needRefillMetabase() bool {
|
func (s *Shard) NeedRefillMetabase() bool {
|
||||||
return s.cfg.refillMetabase
|
return s.cfg.refillMetabase
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue