forked from TrueCloudLab/frostfs-node
Compare commits
2 commits
8ea7b8a504
...
3fe0a84364
Author | SHA1 | Date | |
---|---|---|---|
3fe0a84364 | |||
9706a03b89 |
7 changed files with 110 additions and 35 deletions
|
@ -118,6 +118,7 @@ type applicationConfiguration struct {
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
shards []shardCfg
|
shards []shardCfg
|
||||||
lowMem bool
|
lowMem bool
|
||||||
|
inhumePoolSize uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// if need to run node in compatibility with other versions mode
|
// if need to run node in compatibility with other versions mode
|
||||||
|
@ -249,6 +250,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||||
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||||
|
a.EngineCfg.inhumePoolSize = engineconfig.InhumePoolSize(c)
|
||||||
|
|
||||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||||
}
|
}
|
||||||
|
@ -881,6 +883,7 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||||
engine.WithLogger(c.log),
|
engine.WithLogger(c.log),
|
||||||
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||||
|
engine.WithInhumePoolSize(c.EngineCfg.inhumePoolSize),
|
||||||
)
|
)
|
||||||
|
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
|
|
|
@ -15,6 +15,11 @@ const (
|
||||||
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||||
// process object PUT operations in a storage engine.
|
// process object PUT operations in a storage engine.
|
||||||
ShardPoolSizeDefault = 20
|
ShardPoolSizeDefault = 20
|
||||||
|
|
||||||
|
// InhumePoolSizeDefault is the default size of the engine-level
|
||||||
|
// worker pool, which is used to process objects during the INHUME
|
||||||
|
// operation.
|
||||||
|
InhumePoolSizeDefault = 50
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
||||||
|
@ -88,3 +93,16 @@ func ShardErrorThreshold(c *config.Config) uint32 {
|
||||||
func EngineLowMemoryConsumption(c *config.Config) bool {
|
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||||
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InhumePoolSize returns the value of "inhume_pool_size" config parameter from
|
||||||
|
// "storage" section.
|
||||||
|
//
|
||||||
|
// Returns InhumePoolSizeDefault if the value is not a positive number.
|
||||||
|
func InhumePoolSize(c *config.Config) uint32 {
|
||||||
|
v := config.Uint32Safe(c.Sub(subsection), "inhume_pool_size")
|
||||||
|
if v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
return InhumePoolSizeDefault
|
||||||
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
||||||
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
||||||
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
||||||
|
require.EqualValues(t, engineconfig.InhumePoolSizeDefault, engineconfig.InhumePoolSize(empty))
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
@ -48,6 +49,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
|
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
|
||||||
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
||||||
|
require.EqualValues(t, 50, engineconfig.InhumePoolSize(c))
|
||||||
|
|
||||||
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
|
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -124,6 +124,7 @@ storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
||||||
|
inhume_pool_size: 50 # size of engine-level worker pool used during INHUME operation
|
||||||
|
|
||||||
shard:
|
shard:
|
||||||
default: # section with the default shard parameters
|
default: # section with the default shard parameters
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
@ -71,10 +72,14 @@ func (e *StorageEngine) open(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes all StorageEngine's components.
|
// Init initializes all StorageEngine's components.
|
||||||
func (e *StorageEngine) Init(ctx context.Context) error {
|
func (e *StorageEngine) Init(ctx context.Context) (err error) {
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.Unlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
|
if e.inhumePool, err = ants.NewPool(int(e.inhumePoolSize)); err != nil {
|
||||||
|
return fmt.Errorf("could not create pool: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
errCh := make(chan shardInitError, len(e.shards))
|
errCh := make(chan shardInitError, len(e.shards))
|
||||||
var eg errgroup.Group
|
var eg errgroup.Group
|
||||||
if e.cfg.lowMem && e.anyShardRequiresRefill() {
|
if e.cfg.lowMem && e.anyShardRequiresRefill() {
|
||||||
|
@ -92,7 +97,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
err := eg.Wait()
|
err = eg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to initialize shards: %w", err)
|
return fmt.Errorf("failed to initialize shards: %w", err)
|
||||||
|
@ -161,6 +166,7 @@ func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
|
||||||
for _, p := range e.shardPools {
|
for _, p := range e.shardPools {
|
||||||
p.Release()
|
p.Release()
|
||||||
}
|
}
|
||||||
|
e.inhumePool.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
|
|
|
@ -30,6 +30,8 @@ type StorageEngine struct {
|
||||||
|
|
||||||
shardPools map[string]util.WorkerPool
|
shardPools map[string]util.WorkerPool
|
||||||
|
|
||||||
|
inhumePool util.WorkerPool
|
||||||
|
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
setModeCh chan setModeRequest
|
setModeCh chan setModeRequest
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -191,6 +193,8 @@ type cfg struct {
|
||||||
|
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
|
|
||||||
|
inhumePoolSize uint32
|
||||||
|
|
||||||
lowMem bool
|
lowMem bool
|
||||||
|
|
||||||
containerSource atomic.Pointer[containerSource]
|
containerSource atomic.Pointer[containerSource]
|
||||||
|
@ -198,9 +202,10 @@ type cfg struct {
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
res := &cfg{
|
res := &cfg{
|
||||||
log: logger.NewLoggerWrapper(zap.L()),
|
log: logger.NewLoggerWrapper(zap.L()),
|
||||||
shardPoolSize: 20,
|
shardPoolSize: 20,
|
||||||
metrics: noopMetrics{},
|
inhumePoolSize: 50,
|
||||||
|
metrics: noopMetrics{},
|
||||||
}
|
}
|
||||||
res.containerSource.Store(&containerSource{})
|
res.containerSource.Store(&containerSource{})
|
||||||
return res
|
return res
|
||||||
|
@ -244,6 +249,13 @@ func WithShardPoolSize(sz uint32) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithInhumePoolSize returns option to specify size of worker pool for Inhume operation.
|
||||||
|
func WithInhumePoolSize(sz uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.inhumePoolSize = sz
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithErrorThreshold returns an option to specify size amount of errors after which
|
// WithErrorThreshold returns an option to specify size amount of errors after which
|
||||||
// shard is moved to read-only mode.
|
// shard is moved to read-only mode.
|
||||||
func WithErrorThreshold(sz uint32) Option {
|
func WithErrorThreshold(sz uint32) Option {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package engine
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
@ -81,45 +82,77 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
var shPrm shard.InhumePrm
|
ctx, cancel := context.WithCancelCause(ctx)
|
||||||
if prm.forceRemoval {
|
defer cancel(nil)
|
||||||
shPrm.ForceRemoval()
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range prm.addrs {
|
var wg sync.WaitGroup
|
||||||
if !prm.forceRemoval {
|
|
||||||
locked, err := e.IsLocked(ctx, prm.addrs[i])
|
loop:
|
||||||
if err != nil {
|
for _, addr := range prm.addrs {
|
||||||
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
|
select {
|
||||||
zap.Error(err),
|
case <-ctx.Done():
|
||||||
zap.Stringer("addr", prm.addrs[i]),
|
break loop
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
default:
|
||||||
} else if locked {
|
}
|
||||||
return InhumeRes{}, new(apistatus.ObjectLocked)
|
|
||||||
|
wg.Add(1)
|
||||||
|
if err := e.inhumePool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
if err := e.handleInhumeTask(ctx, addr, prm.tombstone, prm.forceRemoval); err != nil {
|
||||||
|
cancel(err)
|
||||||
}
|
}
|
||||||
}
|
}); err != nil {
|
||||||
|
wg.Done()
|
||||||
|
cancel(err)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
if prm.tombstone != nil {
|
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
|
||||||
shPrm.SetTarget(*prm.tombstone, prm.addrs[i])
|
return InhumeRes{}, errInhumeFailure
|
||||||
} else {
|
|
||||||
shPrm.MarkAsGarbage(prm.addrs[i])
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true)
|
return InhumeRes{}, context.Cause(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error {
|
||||||
|
if !forceRemoval {
|
||||||
|
locked, err := e.IsLocked(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return InhumeRes{}, err
|
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
|
||||||
}
|
zap.Error(err),
|
||||||
if !ok {
|
zap.Stringer("addr", addr),
|
||||||
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false)
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
if err != nil {
|
} else if locked {
|
||||||
return InhumeRes{}, err
|
return new(apistatus.ObjectLocked)
|
||||||
} else if !ok {
|
|
||||||
return InhumeRes{}, errInhumeFailure
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return InhumeRes{}, nil
|
var prm shard.InhumePrm
|
||||||
|
if tombstone != nil {
|
||||||
|
prm.SetTarget(*tombstone, addr)
|
||||||
|
} else {
|
||||||
|
prm.MarkAsGarbage(addr)
|
||||||
|
}
|
||||||
|
if forceRemoval {
|
||||||
|
prm.ForceRemoval()
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err := e.inhumeAddr(ctx, addr, prm, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
ok, err := e.inhumeAddr(ctx, addr, prm, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !ok {
|
||||||
|
return errInhumeFailure
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns ok if object was inhumed during this invocation or before.
|
// Returns ok if object was inhumed during this invocation or before.
|
||||||
|
|
Loading…
Reference in a new issue