Shard OPS limiter #1636
|
@ -33,6 +33,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
|
@ -135,6 +136,7 @@ type shardCfg struct {
|
|||
refillMetabase bool
|
||||
refillMetabaseWorkersCount int
|
||||
mode shardmode.Mode
|
||||
limiter qos.Limiter
|
||||
|
||||
metaCfg struct {
|
||||
path string
|
||||
|
@ -254,39 +256,42 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error {
|
||||
var newConfig shardCfg
|
||||
func (a *applicationConfiguration) updateShardConfig(c *config.Config, source *shardconfig.Config) error {
|
||||
var target shardCfg
|
||||
|
||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||
newConfig.mode = oldConfig.Mode()
|
||||
newConfig.compress = oldConfig.Compress()
|
||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
|
||||
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
|
||||
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
|
||||
target.refillMetabase = source.RefillMetabase()
|
||||
target.refillMetabaseWorkersCount = source.RefillMetabaseWorkersCount()
|
||||
target.mode = source.Mode()
|
||||
target.compress = source.Compress()
|
||||
target.estimateCompressibility = source.EstimateCompressibility()
|
||||
target.estimateCompressibilityThreshold = source.EstimateCompressibilityThreshold()
|
||||
target.uncompressableContentType = source.UncompressableContentTypes()
|
||||
target.smallSizeObjectLimit = source.SmallSizeLimit()
|
||||
|
||||
a.setShardWriteCacheConfig(&newConfig, oldConfig)
|
||||
a.setShardWriteCacheConfig(&target, source)
|
||||
|
||||
a.setShardPiloramaConfig(c, &newConfig, oldConfig)
|
||||
a.setShardPiloramaConfig(c, &target, source)
|
||||
|
||||
if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil {
|
||||
if err := a.setShardStorageConfig(&target, source); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.setMetabaseConfig(&newConfig, oldConfig)
|
||||
a.setMetabaseConfig(&target, source)
|
||||
|
||||
a.setGCConfig(&newConfig, oldConfig)
|
||||
a.setGCConfig(&target, source)
|
||||
if err := a.setLimiter(&target, source); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig)
|
||||
a.EngineCfg.shards = append(a.EngineCfg.shards, target)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
||||
writeCacheCfg := oldConfig.WriteCache()
|
||||
func (a *applicationConfiguration) setShardWriteCacheConfig(target *shardCfg, source *shardconfig.Config) {
|
||||
writeCacheCfg := source.WriteCache()
|
||||
if writeCacheCfg.Enabled() {
|
||||
wc := &newConfig.writecacheCfg
|
||||
wc := &target.writecacheCfg
|
||||
|
||||
wc.enabled = true
|
||||
wc.path = writeCacheCfg.Path()
|
||||
|
@ -299,10 +304,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
|
|||
}
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
||||
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, target *shardCfg, source *shardconfig.Config) {
|
||||
if config.BoolSafe(c.Sub("tree"), "enabled") {
|
||||
piloramaCfg := oldConfig.Pilorama()
|
||||
pr := &newConfig.piloramaCfg
|
||||
piloramaCfg := source.Pilorama()
|
||||
pr := &target.piloramaCfg
|
||||
|
||||
pr.enabled = true
|
||||
pr.path = piloramaCfg.Path()
|
||||
|
@ -313,8 +318,8 @@ func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newC
|
|||
}
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
|
||||
blobStorCfg := oldConfig.BlobStor()
|
||||
func (a *applicationConfiguration) setShardStorageConfig(target *shardCfg, source *shardconfig.Config) error {
|
||||
blobStorCfg := source.BlobStor()
|
||||
storagesCfg := blobStorCfg.Storages()
|
||||
|
||||
ss := make([]subStorageCfg, 0, len(storagesCfg))
|
||||
|
@ -348,13 +353,13 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
|
|||
ss = append(ss, sCfg)
|
||||
}
|
||||
|
||||
newConfig.subStorages = ss
|
||||
target.subStorages = ss
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
||||
metabaseCfg := oldConfig.Metabase()
|
||||
m := &newConfig.metaCfg
|
||||
func (a *applicationConfiguration) setMetabaseConfig(target *shardCfg, source *shardconfig.Config) {
|
||||
metabaseCfg := source.Metabase()
|
||||
m := &target.metaCfg
|
||||
|
||||
m.path = metabaseCfg.Path()
|
||||
m.perm = metabaseCfg.BoltDB().Perm()
|
||||
|
@ -362,12 +367,25 @@ func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldCon
|
|||
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
||||
gcCfg := oldConfig.GC()
|
||||
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
||||
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
||||
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
|
||||
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
||||
func (a *applicationConfiguration) setGCConfig(target *shardCfg, source *shardconfig.Config) {
|
||||
gcCfg := source.GC()
|
||||
target.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
||||
target.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
||||
target.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
|
||||
target.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
||||
}
|
||||
|
||||
func (a *applicationConfiguration) setLimiter(target *shardCfg, source *shardconfig.Config) error {
|
||||
limitsConfig := source.Limits()
|
||||
limiter, err := qos.NewLimiter(limitsConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if target.limiter != nil {
|
||||
a-savchuk marked this conversation as resolved
Outdated
|
||||
target.limiter.Close()
|
||||
}
|
||||
target.limiter = limiter
|
||||
return nil
|
||||
}
|
||||
|
||||
// internals contains application-specific internals that are created
|
||||
|
@ -1054,6 +1072,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
|||
|
||||
return pool
|
||||
}),
|
||||
shard.WithLimiter(shCfg.limiter),
|
||||
}
|
||||
return sh
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
|
||||
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
|
||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
|
@ -76,6 +77,7 @@ func TestEngineSection(t *testing.T) {
|
|||
ss := blob.Storages()
|
||||
pl := sc.Pilorama()
|
||||
gc := sc.GC()
|
||||
limits := sc.Limits()
|
||||
|
||||
switch num {
|
||||
case 0:
|
||||
|
@ -134,6 +136,75 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, false, sc.RefillMetabase())
|
||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||
|
||||
readLimits := limits.Read()
|
||||
writeLimits := limits.Write()
|
||||
require.Equal(t, 30*time.Second, readLimits.IdleTimeout)
|
||||
require.Equal(t, int64(10_000), readLimits.MaxRunningOps)
|
||||
require.Equal(t, int64(1_000), readLimits.MaxWaitingOps)
|
||||
require.Equal(t, 45*time.Second, writeLimits.IdleTimeout)
|
||||
require.Equal(t, int64(1_000), writeLimits.MaxRunningOps)
|
||||
require.Equal(t, int64(100), writeLimits.MaxWaitingOps)
|
||||
require.ElementsMatch(t, readLimits.Tags,
|
||||
[]limitsconfig.IOTagConfig{
|
||||
{
|
||||
Tag: "internal",
|
||||
Weight: toPtr(20),
|
||||
ReservedOps: toPtr(1000),
|
||||
LimitOps: toPtr(0),
|
||||
},
|
||||
{
|
||||
Tag: "client",
|
||||
Weight: toPtr(70),
|
||||
ReservedOps: toPtr(10000),
|
||||
},
|
||||
{
|
||||
Tag: "background",
|
||||
Weight: toPtr(5),
|
||||
LimitOps: toPtr(10000),
|
||||
ReservedOps: toPtr(0),
|
||||
},
|
||||
{
|
||||
Tag: "writecache",
|
||||
Weight: toPtr(5),
|
||||
LimitOps: toPtr(25000),
|
||||
},
|
||||
{
|
||||
Tag: "policer",
|
||||
Weight: toPtr(5),
|
||||
LimitOps: toPtr(25000),
|
||||
},
|
||||
})
|
||||
require.ElementsMatch(t, writeLimits.Tags,
|
||||
[]limitsconfig.IOTagConfig{
|
||||
{
|
||||
Tag: "internal",
|
||||
Weight: toPtr(200),
|
||||
ReservedOps: toPtr(100),
|
||||
LimitOps: toPtr(0),
|
||||
},
|
||||
{
|
||||
Tag: "client",
|
||||
Weight: toPtr(700),
|
||||
ReservedOps: toPtr(1000),
|
||||
},
|
||||
{
|
||||
Tag: "background",
|
||||
Weight: toPtr(50),
|
||||
LimitOps: toPtr(1000),
|
||||
ReservedOps: toPtr(0),
|
||||
},
|
||||
{
|
||||
Tag: "writecache",
|
||||
Weight: toPtr(50),
|
||||
LimitOps: toPtr(2500),
|
||||
},
|
||||
{
|
||||
Tag: "policer",
|
||||
Weight: toPtr(50),
|
||||
LimitOps: toPtr(2500),
|
||||
},
|
||||
})
|
||||
case 1:
|
||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||
|
@ -188,6 +259,17 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, true, sc.RefillMetabase())
|
||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||
|
||||
readLimits := limits.Read()
|
||||
writeLimits := limits.Write()
|
||||
require.Equal(t, limitsconfig.DefaultIdleTimeout, readLimits.IdleTimeout)
|
||||
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxRunningOps)
|
||||
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxWaitingOps)
|
||||
require.Equal(t, limitsconfig.DefaultIdleTimeout, writeLimits.IdleTimeout)
|
||||
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxRunningOps)
|
||||
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxWaitingOps)
|
||||
require.Equal(t, 0, len(readLimits.Tags))
|
||||
require.Equal(t, 0, len(writeLimits.Tags))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
@ -201,3 +283,7 @@ func TestEngineSection(t *testing.T) {
|
|||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
||||
|
||||
func toPtr(v float64) *float64 {
|
||||
return &v
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
blobstorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor"
|
||||
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
|
||||
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
metabaseconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/metabase"
|
||||
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
|
||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
|
||||
|
@ -125,6 +126,14 @@ func (x *Config) GC() *gcconfig.Config {
|
|||
)
|
||||
}
|
||||
|
||||
// Limits returns "limits" subsection as a limitsconfig.Config.
|
||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
gcconfig? gcconfig?
dstepanov-yadro
commented
fixed fixed
dstepanov-yadro
commented
fixed fixed
|
||||
func (x *Config) Limits() *limitsconfig.Config {
|
||||
return limitsconfig.From(
|
||||
(*config.Config)(x).
|
||||
Sub("limits"),
|
||||
)
|
||||
}
|
||||
|
||||
// RefillMetabase returns the value of "resync_metabase" config parameter.
|
||||
//
|
||||
// Returns false if the value is not a valid bool.
|
||||
|
|
130
cmd/frostfs-node/config/engine/shard/limits/config.go
Normal file
|
@ -0,0 +1,130 @@
|
|||
package limits
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
const (
|
||||
NoLimit int64 = math.MaxInt64
|
||||
DefaultIdleTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
func From(c *config.Config) *Config {
|
||||
return (*Config)(c)
|
||||
}
|
||||
|
||||
// Config is a wrapper over the config section
|
||||
// which provides access to Shard's limits configurations.
|
||||
type Config config.Config
|
||||
|
||||
// Read returns the value of "read" limits config section.
|
||||
func (x *Config) Read() OpConfig {
|
||||
return x.parse("read")
|
||||
}
|
||||
|
||||
// Write returns the value of "write" limits config section.
|
||||
func (x *Config) Write() OpConfig {
|
||||
return x.parse("write")
|
||||
}
|
||||
|
||||
func (x *Config) parse(sub string) OpConfig {
|
||||
c := (*config.Config)(x).Sub(sub)
|
||||
var result OpConfig
|
||||
|
||||
if s := config.Int(c, "max_waiting_ops"); s > 0 {
|
||||
result.MaxWaitingOps = s
|
||||
} else {
|
||||
result.MaxWaitingOps = NoLimit
|
||||
}
|
||||
|
||||
if s := config.Int(c, "max_running_ops"); s > 0 {
|
||||
result.MaxRunningOps = s
|
||||
} else {
|
||||
result.MaxRunningOps = NoLimit
|
||||
}
|
||||
|
||||
if s := config.DurationSafe(c, "idle_timeout"); s > 0 {
|
||||
result.IdleTimeout = s
|
||||
} else {
|
||||
result.IdleTimeout = DefaultIdleTimeout
|
||||
}
|
||||
|
||||
result.Tags = tags(c)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
type OpConfig struct {
|
||||
// MaxWaitingOps returns the value of "max_waiting_ops" config parameter.
|
||||
//
|
||||
// Equals NoLimit if the value is not a positive number.
|
||||
MaxWaitingOps int64
|
||||
// MaxRunningOps returns the value of "max_running_ops" config parameter.
|
||||
//
|
||||
// Equals NoLimit if the value is not a positive number.
|
||||
MaxRunningOps int64
|
||||
// IdleTimeout returns the value of "idle_timeout" config parameter.
|
||||
//
|
||||
// Equals DefaultIdleTimeout if the value is not a valid duration.
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Struct field doesn't return anything. Sth about default value here? Struct field doesn't return anything. Sth about default value here?
dstepanov-yadro
commented
Fixed Fixed
|
||||
IdleTimeout time.Duration
|
||||
// Tags returns the value of "tags" config parameter.
|
||||
//
|
||||
// Equals nil if the value is not a valid tags config slice.
|
||||
Tags []IOTagConfig
|
||||
}
|
||||
|
||||
type IOTagConfig struct {
|
||||
Tag string
|
||||
Weight *float64
|
||||
LimitOps *float64
|
||||
ReservedOps *float64
|
||||
}
|
||||
|
||||
func tags(c *config.Config) []IOTagConfig {
|
||||
c = c.Sub("tags")
|
||||
var result []IOTagConfig
|
||||
for i := 0; ; i++ {
|
||||
tag := config.String(c, strconv.Itoa(i)+".tag")
|
||||
if tag == "" {
|
||||
return result
|
||||
}
|
||||
|
||||
var tagConfig IOTagConfig
|
||||
tagConfig.Tag = tag
|
||||
|
||||
v := c.Value(strconv.Itoa(i) + ".weight")
|
||||
if v != nil {
|
||||
w, err := cast.ToFloat64E(v)
|
||||
panicOnErr(err)
|
||||
tagConfig.Weight = &w
|
||||
}
|
||||
|
||||
v = c.Value(strconv.Itoa(i) + ".limit_ops")
|
||||
if v != nil {
|
||||
l, err := cast.ToFloat64E(v)
|
||||
panicOnErr(err)
|
||||
tagConfig.LimitOps = &l
|
||||
}
|
||||
|
||||
v = c.Value(strconv.Itoa(i) + ".reserved_ops")
|
||||
if v != nil {
|
||||
r, err := cast.ToFloat64E(v)
|
||||
panicOnErr(err)
|
||||
tagConfig.ReservedOps = &r
|
||||
}
|
||||
|
||||
result = append(result, tagConfig)
|
||||
}
|
||||
}
|
||||
|
||||
func panicOnErr(err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
|
@ -157,6 +157,47 @@ FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
|
|||
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
|
||||
#### Limit of concurrent workers collecting expired objects by the garbage collector
|
||||
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15
|
||||
#### Limits config
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_MAX_RUNNING_OPS=10000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_MAX_WAITING_OPS=1000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_MAX_RUNNING_OPS=1000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_MAX_WAITING_OPS=100
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_IDLE_TIMEOUT=45s
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_IDLE_TIMEOUT=30s
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_TAG=internal
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_WEIGHT=20
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_LIMIT_OPS=0
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_RESERVED_OPS=1000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_TAG=client
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_WEIGHT=70
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_RESERVED_OPS=10000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_TAG=background
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_WEIGHT=5
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_LIMIT_OPS=10000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_RESERVED_OPS=0
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_TAG=writecache
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_WEIGHT=5
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_RESERVED_OPS=100
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_TAG=client
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_WEIGHT=700
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_RESERVED_OPS=1000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_TAG=background
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_WEIGHT=50
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_LIMIT_OPS=1000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_RESERVED_OPS=0
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_TAG=writecache
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_WEIGHT=50
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_LIMIT_OPS=2500
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_TAG=policer
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_WEIGHT=50
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_LIMIT_OPS=2500
|
||||
|
||||
## 1 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
|
|
|
@ -221,6 +221,76 @@
|
|||
"remover_sleep_interval": "2m",
|
||||
"expired_collector_batch_size": 1500,
|
||||
"expired_collector_worker_count": 15
|
||||
},
|
||||
"limits": {
|
||||
"read": {
|
||||
"max_running_ops": 10000,
|
||||
"max_waiting_ops": 1000,
|
||||
"idle_timeout": "30s",
|
||||
"tags": [
|
||||
{
|
||||
"tag": "internal",
|
||||
"weight": 20,
|
||||
"limit_ops": 0,
|
||||
"reserved_ops": 1000
|
||||
},
|
||||
{
|
||||
"tag": "client",
|
||||
"weight": 70,
|
||||
"reserved_ops": 10000
|
||||
},
|
||||
{
|
||||
"tag": "background",
|
||||
"weight": 5,
|
||||
"limit_ops": 10000,
|
||||
"reserved_ops": 0
|
||||
},
|
||||
{
|
||||
"tag": "writecache",
|
||||
"weight": 5,
|
||||
"limit_ops": 25000
|
||||
},
|
||||
{
|
||||
"tag": "policer",
|
||||
"weight": 5,
|
||||
"limit_ops": 25000
|
||||
}
|
||||
]
|
||||
},
|
||||
"write": {
|
||||
"max_running_ops": 1000,
|
||||
"max_waiting_ops": 100,
|
||||
"idle_timeout": "45s",
|
||||
"tags": [
|
||||
{
|
||||
"tag": "internal",
|
||||
"weight": 200,
|
||||
"limit_ops": 0,
|
||||
"reserved_ops": 100
|
||||
},
|
||||
{
|
||||
"tag": "client",
|
||||
"weight": 700,
|
||||
"reserved_ops": 1000
|
||||
},
|
||||
{
|
||||
"tag": "background",
|
||||
"weight": 50,
|
||||
"limit_ops": 1000,
|
||||
"reserved_ops": 0
|
||||
},
|
||||
{
|
||||
"tag": "writecache",
|
||||
"weight": 50,
|
||||
"limit_ops": 2500
|
||||
},
|
||||
{
|
||||
"tag": "policer",
|
||||
"weight": 50,
|
||||
"limit_ops": 2500
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"1": {
|
||||
|
|
|
@ -227,6 +227,52 @@ storage:
|
|||
expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
|
||||
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
|
||||
|
||||
limits:
|
||||
read:
|
||||
max_running_ops: 10000
|
||||
max_waiting_ops: 1000
|
||||
idle_timeout: 30s
|
||||
tags:
|
||||
- tag: internal
|
||||
weight: 20
|
||||
limit_ops: 0
|
||||
reserved_ops: 1000
|
||||
- tag: client
|
||||
weight: 70
|
||||
reserved_ops: 10000
|
||||
- tag: background
|
||||
weight: 5
|
||||
limit_ops: 10000
|
||||
reserved_ops: 0
|
||||
- tag: writecache
|
||||
weight: 5
|
||||
limit_ops: 25000
|
||||
- tag: policer
|
||||
weight: 5
|
||||
limit_ops: 25000
|
||||
write:
|
||||
max_running_ops: 1000
|
||||
max_waiting_ops: 100
|
||||
idle_timeout: 45s
|
||||
tags:
|
||||
- tag: internal
|
||||
weight: 200
|
||||
limit_ops: 0
|
||||
reserved_ops: 100
|
||||
- tag: client
|
||||
weight: 700
|
||||
reserved_ops: 1000
|
||||
- tag: background
|
||||
weight: 50
|
||||
limit_ops: 1000
|
||||
reserved_ops: 0
|
||||
- tag: writecache
|
||||
weight: 50
|
||||
limit_ops: 2500
|
||||
- tag: policer
|
||||
weight: 50
|
||||
limit_ops: 2500
|
||||
|
||||
1:
|
||||
writecache:
|
||||
path: tmp/1/cache # write-cache root directory
|
||||
|
|
|
@ -195,6 +195,7 @@ The following table describes configuration for each shard.
|
|||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||
| `small_object_size` | `size` | `1M` | Maximum size of an object stored in blobovnicza tree. |
|
||||
| `gc` | [GC config](#gc-subsection) | | GC configuration. |
|
||||
| `limits` | [Shard limits config](#limits-subsection) | | Shard limits configuration. |
|
||||
|
||||
### `blobstor` subsection
|
||||
|
||||
|
@ -301,6 +302,64 @@ writecache:
|
|||
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
|
||||
| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. |
|
||||
|
||||
### `limits` subsection
|
||||
|
||||
```yaml
|
||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
You defined this section twice You defined this section twice
dstepanov-yadro
commented
fixed fixed
dstepanov-yadro
commented
fixed fixed
|
||||
limits:
|
||||
max_read_running_ops: 10000
|
||||
max_read_waiting_ops: 1000
|
||||
max_write_running_ops: 1000
|
||||
max_write_waiting_ops: 100
|
||||
read:
|
||||
- tag: internal
|
||||
weight: 20
|
||||
limit_ops: 0
|
||||
reserved_ops: 1000
|
||||
- tag: client
|
||||
weight: 70
|
||||
reserved_ops: 10000
|
||||
- tag: background
|
||||
weight: 5
|
||||
limit_ops: 10000
|
||||
reserved_ops: 0
|
||||
- tag: writecache
|
||||
weight: 5
|
||||
limit_ops: 25000
|
||||
- tag: policer
|
||||
weight: 5
|
||||
limit_ops: 25000
|
||||
write:
|
||||
- tag: internal
|
||||
weight: 200
|
||||
limit_ops: 0
|
||||
reserved_ops: 100
|
||||
- tag: client
|
||||
weight: 700
|
||||
reserved_ops: 1000
|
||||
- tag: background
|
||||
weight: 50
|
||||
limit_ops: 1000
|
||||
reserved_ops: 0
|
||||
- tag: writecache
|
||||
weight: 50
|
||||
limit_ops: 2500
|
||||
- tag: policer
|
||||
weight: 50
|
||||
limit_ops: 2500
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
| ----------------------- | -------- | -------------- | --------------------------------------------------------------------------------------------------------------- |
|
||||
| `max_read_running_ops` | `int` | 0 (no limit) | The maximum number of runnig read operations. |
|
||||
| `max_read_waiting_ops` | `int` | 0 (no limit) | The maximum number of waiting read operations. |
|
||||
| `max_write_running_ops` | `int` | 0 (no limit) | The maximum number of running write operations. |
|
||||
| `max_write_waiting_ops` | `int` | 0 (no limit) | The maximum number of running write operations. |
|
||||
| `read` | `[]tag` | empty | Array of shard read settings for tags. |
|
||||
| `write` | `[]tag` | empty | Array of shard write settings for tags. |
|
||||
| `tag.tag` | `string` | empty | Tag name. Allowed values: `client`, `internal`, `background`, `writecache`, `policer`. |
|
||||
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
|
||||
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
|
||||
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
|
||||
|
||||
# `node` section
|
||||
|
||||
|
|
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
|
|
4
go.sum
|
@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 h1:QnAt5b2R6+hQthMOIn5ECfLAlVD8IAE5JRm1NCCOmuE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
|
|
9
internal/assert/cond.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package assert
|
||||
|
||||
import "strings"
|
||||
|
||||
func True(cond bool, details ...string) {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Don't mind the naming, but do you have any examples of it being used somewhere?
Don't mind the naming, but do you have any examples of it being used somewhere?
`assert.True` is what I have seen in e.g. testify library.
dstepanov-yadro
commented
Fixed Fixed
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Btw, good call with introducing this function! Btw, good call with introducing this function!
|
||||
if !cond {
|
||||
panic(strings.Join(details, " "))
|
||||
}
|
||||
}
|
146
internal/qos/limiter.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultIdleTimeout time.Duration = 0
|
||||
defaultShare float64 = 1.0
|
||||
)
|
||||
|
||||
type ReleaseFunc scheduling.ReleaseFunc
|
||||
|
||||
type Limiter interface {
|
||||
ReadRequest(context.Context) (ReleaseFunc, error)
|
||||
WriteRequest(context.Context) (ReleaseFunc, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
type scheduler interface {
|
||||
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||
if err := validateConfig(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
read, write := c.Read(), c.Write()
|
||||
if isNoop(read, write) {
|
||||
return noopLimiterInstance, nil
|
||||
}
|
||||
readScheduler, err := createScheduler(c.Read())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create read scheduler: %w", err)
|
||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
Should be without "failed to" Should be without "failed to"
dstepanov-yadro
commented
ok, fixed ok, fixed
|
||||
}
|
||||
writeScheduler, err := createScheduler(c.Write())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create write scheduler: %w", err)
|
||||
}
|
||||
return &mClockLimiter{
|
||||
readScheduler: readScheduler,
|
||||
writeScheduler: writeScheduler,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
|
||||
return newSemaphoreScheduler(config.MaxRunningOps), nil
|
||||
}
|
||||
return scheduling.NewMClock(
|
||||
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
|
||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
You do a cast from You do a cast from `int64` to `int` when reading a config, then you cast `int` to `uint64` here. What about making `config.MaxRunningOps` and `config.MaxWaitingOps` of type `uint64`?
dstepanov-yadro
commented
Fixed: now config variables have int64 type. Fixed: now config variables have int64 type.
dstepanov-yadro
commented
Fixed: now config variables have int64 type. Fixed: now config variables have int64 type.
|
||||
converToSchedulingTags(config.Tags), config.IdleTimeout)
|
||||
}
|
||||
|
||||
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||
result := make(map[string]scheduling.TagInfo)
|
||||
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
||||
result[tag.String()] = scheduling.TagInfo{
|
||||
Share: defaultShare,
|
||||
}
|
||||
}
|
||||
for _, l := range limits {
|
||||
v := result[l.Tag]
|
||||
if l.Weight != nil && *l.Weight != 0 {
|
||||
v.Share = *l.Weight
|
||||
}
|
||||
if l.LimitOps != nil && *l.LimitOps != 0 {
|
||||
v.LimitIOPS = l.LimitOps
|
||||
}
|
||||
if l.ReservedOps != nil && *l.ReservedOps != 0 {
|
||||
v.ReservedIOPS = l.ReservedOps
|
||||
}
|
||||
result[l.Tag] = v
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
var (
|
||||
_ Limiter = (*noopLimiter)(nil)
|
||||
releaseStub ReleaseFunc = func() {}
|
||||
noopLimiterInstance = &noopLimiter{}
|
||||
)
|
||||
|
||||
func NewNoopLimiter() Limiter {
|
||||
return &noopLimiter{}
|
||||
}
|
||||
|
||||
type noopLimiter struct{}
|
||||
|
||||
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
|
||||
return releaseStub, nil
|
||||
}
|
||||
|
||||
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
|
||||
return releaseStub, nil
|
||||
}
|
||||
|
||||
func (n *noopLimiter) Close() {}
|
||||
|
||||
var _ Limiter = (*mClockLimiter)(nil)
|
||||
|
||||
type mClockLimiter struct {
|
||||
readScheduler scheduler
|
||||
writeScheduler scheduler
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.readScheduler)
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.writeScheduler)
|
||||
}
|
||||
|
||||
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = IOTagClient.String()
|
||||
}
|
||||
if tag == IOTagCritical.String() {
|
||||
return releaseStub, nil
|
||||
}
|
||||
rel, err := s.RequestArrival(ctx, tag)
|
||||
if err != nil {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||
errors.Is(err, errSemaphoreLimitExceeded) {
|
||||
return nil, &apistatus.ResourceExhausted{}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return ReleaseFunc(rel), nil
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) Close() {
|
||||
n.readScheduler.Close()
|
||||
n.writeScheduler.Close()
|
||||
}
|
39
internal/qos/semaphore.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
qosSemaphore "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||
)
|
||||
|
||||
var (
|
||||
_ scheduler = (*semaphore)(nil)
|
||||
errSemaphoreLimitExceeded = errors.New("semaphore limit exceeded")
|
||||
)
|
||||
|
||||
type semaphore struct {
|
||||
s *qosSemaphore.Semaphore
|
||||
}
|
||||
|
||||
func newSemaphoreScheduler(size int64) *semaphore {
|
||||
return &semaphore{
|
||||
s: qosSemaphore.NewSemaphore(size),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *semaphore) Close() {}
|
||||
|
||||
func (s *semaphore) RequestArrival(ctx context.Context, _ string) (scheduling.ReleaseFunc, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I have seen similar code in I have seen similar code in `frostfs-qos/limiting/semaphore` by @a-savchuk , have you not reused it deliberately?
dstepanov-yadro
commented
I just didn't wait for merge. I just didn't wait for merge.
Fixed.
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if s.s.Acquire() {
|
||||
return s.s.Release, nil
|
||||
}
|
||||
return nil, errSemaphoreLimitExceeded
|
||||
}
|
101
internal/qos/validate.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
)
|
||||
|
||||
var errWeightsMustBeSpecified = errors.New("invalid weights: weights must be specified for all tags or not specified for any")
|
||||
|
||||
type tagConfig struct {
|
||||
Shares, Limit, Reserved *float64
|
||||
}
|
||||
|
||||
func validateConfig(c *limits.Config) error {
|
||||
if err := validateOpConfig(c.Read()); err != nil {
|
||||
return fmt.Errorf("limits 'read' section validation error: %w", err)
|
||||
}
|
||||
if err := validateOpConfig(c.Write()); err != nil {
|
||||
return fmt.Errorf("limits 'write' section validation error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateOpConfig(c limits.OpConfig) error {
|
||||
if c.MaxRunningOps <= 0 {
|
||||
return fmt.Errorf("invalid 'max_running_ops = %d': must be greater than zero", c.MaxRunningOps)
|
||||
}
|
||||
if c.MaxWaitingOps <= 0 {
|
||||
return fmt.Errorf("invalid 'max_waiting_ops = %d': must be greater than zero", c.MaxWaitingOps)
|
||||
}
|
||||
if c.IdleTimeout <= 0 {
|
||||
return fmt.Errorf("invalid 'idle_timeout = %s': must be greater than zero", c.IdleTimeout.String())
|
||||
}
|
||||
if err := validateTags(c.Tags); err != nil {
|
||||
return fmt.Errorf("'tags' config section validation error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateTags(configTags []limits.IOTagConfig) error {
|
||||
tags := map[IOTag]tagConfig{
|
||||
IOTagClient: {},
|
||||
IOTagInternal: {},
|
||||
IOTagBackground: {},
|
||||
IOTagWritecache: {},
|
||||
IOTagPolicer: {},
|
||||
}
|
||||
for _, t := range configTags {
|
||||
tag, err := FromRawString(t.Tag)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid tag %s: %w", t.Tag, err)
|
||||
}
|
||||
if _, ok := tags[tag]; !ok {
|
||||
return fmt.Errorf("tag %s is not configurable", t.Tag)
|
||||
}
|
||||
tags[tag] = tagConfig{
|
||||
Shares: t.Weight,
|
||||
Limit: t.LimitOps,
|
||||
Reserved: t.ReservedOps,
|
||||
}
|
||||
}
|
||||
idx := 0
|
||||
var shares float64
|
||||
for t, v := range tags {
|
||||
if idx == 0 {
|
||||
idx++
|
||||
shares = float64Value(v.Shares)
|
||||
} else if (shares != 0 && float64Value(v.Shares) == 0) || (shares == 0 && float64Value(v.Shares) != 0) {
|
||||
return errWeightsMustBeSpecified
|
||||
}
|
||||
if float64Value(v.Shares) < 0 || math.IsNaN(float64Value(v.Shares)) {
|
||||
return fmt.Errorf("invalid weight for tag %s: must be positive value", t.String())
|
||||
}
|
||||
if float64Value(v.Limit) < 0 || math.IsNaN(float64Value(v.Limit)) {
|
||||
return fmt.Errorf("invalid limit_ops for tag %s: must be positive value", t.String())
|
||||
}
|
||||
if float64Value(v.Reserved) < 0 || math.IsNaN(float64Value(v.Reserved)) {
|
||||
return fmt.Errorf("invalid reserved_ops for tag %s: must be positive value", t.String())
|
||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
`limit_ops` -> `reserved_ops`
dstepanov-yadro
commented
fixed fixed
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func float64Value(f *float64) float64 {
|
||||
if f == nil {
|
||||
return 0.0
|
||||
}
|
||||
return *f
|
||||
}
|
||||
|
||||
func isNoop(read, write limits.OpConfig) bool {
|
||||
return read.MaxRunningOps == limits.NoLimit &&
|
||||
read.MaxWaitingOps == limits.NoLimit &&
|
||||
write.MaxRunningOps == limits.NoLimit &&
|
||||
write.MaxWaitingOps == limits.NoLimit &&
|
||||
len(read.Tags) == 0 &&
|
||||
len(write.Tags) == 0
|
||||
}
|
|
@ -50,7 +50,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
|
|||
var res common.RebuildRes
|
||||
|
||||
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
|
||||
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
|
||||
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage, prm.Limiter)
|
||||
res.ObjectsMoved += completedPreviosMoves
|
||||
if err != nil {
|
||||
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
||||
|
@ -79,7 +79,7 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
|
|||
var completedDBCount uint32
|
||||
for _, db := range dbs {
|
||||
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
||||
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
|
||||
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.Limiter)
|
||||
res.ObjectsMoved += movedObjects
|
||||
if err != nil {
|
||||
b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
||||
|
@ -195,7 +195,7 @@ func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFil
|
|||
return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
||||
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
|
||||
shDB := b.getBlobovnicza(ctx, path)
|
||||
blz, err := shDB.Open(ctx)
|
||||
if err != nil {
|
||||
|
@ -212,7 +212,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
|
||||
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, concLimiter)
|
||||
if err != nil {
|
||||
return migratedObjects, err
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.RebuildLimiter) (uint64, error) {
|
||||
var result atomic.Uint64
|
||||
batch := make(map[oid.Address][]byte)
|
||||
|
||||
|
@ -253,7 +253,12 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
|||
})
|
||||
|
||||
for {
|
||||
_, err := blz.Iterate(ctx, prm)
|
||||
release, err := limiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return result.Load(), err
|
||||
}
|
||||
_, err = blz.Iterate(ctx, prm)
|
||||
release()
|
||||
if err != nil && !errors.Is(err, errBatchFull) {
|
||||
return result.Load(), err
|
||||
}
|
||||
|
@ -265,13 +270,19 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
|||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
|
||||
for addr, data := range batch {
|
||||
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
|
||||
release, err := limiter.AcquireWorkSlot(egCtx)
|
||||
if err != nil {
|
||||
_ = eg.Wait()
|
||||
return result.Load(), err
|
||||
}
|
||||
eg.Go(func() error {
|
||||
defer limiter.ReleaseWorkSlot()
|
||||
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
|
||||
defer release()
|
||||
moveRelease, err := limiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.moveObject(egCtx, blz, blzPath, addr, data, meta)
|
||||
moveRelease()
|
||||
if err == nil {
|
||||
result.Add(1)
|
||||
}
|
||||
|
@ -359,7 +370,7 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
|||
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
||||
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage, rateLimiter common.RateLimiter) (uint64, error) {
|
||||
var count uint64
|
||||
var rebuildTempFilesToRemove []string
|
||||
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
|
||||
|
@ -372,13 +383,24 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
|
|||
}
|
||||
defer shDB.Close(ctx)
|
||||
|
||||
release, err := rateLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
||||
release()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
for _, move := range incompletedMoves {
|
||||
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
|
||||
release, err := rateLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
err = b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore)
|
||||
release()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
count++
|
||||
|
@ -388,9 +410,14 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
|
|||
return false, nil
|
||||
})
|
||||
for _, tmp := range rebuildTempFilesToRemove {
|
||||
release, err := rateLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return count, err
|
||||
}
|
||||
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
|
||||
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
|
||||
}
|
||||
release()
|
||||
}
|
||||
return count, err
|
||||
}
|
||||
|
|
|
@ -161,16 +161,18 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
|
|||
storageIDs: make(map[oid.Address][]byte),
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
limiter := &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||
MetaStorage: metaStub,
|
||||
WorkerLimiter: &rebuildLimiterStub{},
|
||||
FillPercent: 1,
|
||||
MetaStorage: metaStub,
|
||||
Limiter: limiter,
|
||||
FillPercent: 1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1), rRes.ObjectsMoved)
|
||||
require.Equal(t, uint64(0), rRes.FilesRemoved)
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
|
||||
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
|
||||
require.NoError(t, blz.Open(context.Background()))
|
||||
|
|
|
@ -2,7 +2,9 @@ package blobovniczatree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -76,10 +78,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
limiter := &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||
MetaStorage: metaStub,
|
||||
WorkerLimiter: &rebuildLimiterStub{},
|
||||
FillPercent: 60,
|
||||
MetaStorage: metaStub,
|
||||
Limiter: limiter,
|
||||
FillPercent: 60,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||
|
@ -94,6 +97,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
})
|
||||
|
||||
t.Run("no rebuild single db", func(t *testing.T) {
|
||||
|
@ -128,10 +132,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
limiter := &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||
MetaStorage: metaStub,
|
||||
WorkerLimiter: &rebuildLimiterStub{},
|
||||
FillPercent: 90, // 64KB / 100KB = 64%
|
||||
MetaStorage: metaStub,
|
||||
Limiter: limiter,
|
||||
FillPercent: 90, // 64KB / 100KB = 64%
|
||||
})
|
||||
require.NoError(t, err)
|
||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||
|
@ -146,6 +151,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
})
|
||||
|
||||
t.Run("rebuild by fill percent", func(t *testing.T) {
|
||||
|
@ -193,10 +199,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
limiter := &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||
MetaStorage: metaStub,
|
||||
WorkerLimiter: &rebuildLimiterStub{},
|
||||
FillPercent: 80,
|
||||
MetaStorage: metaStub,
|
||||
Limiter: limiter,
|
||||
FillPercent: 80,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(49), rRes.FilesRemoved)
|
||||
|
@ -215,6 +222,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
})
|
||||
|
||||
t.Run("rebuild by overflow", func(t *testing.T) {
|
||||
|
@ -266,10 +274,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
require.NoError(t, b.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
limiter := &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||
MetaStorage: metaStub,
|
||||
WorkerLimiter: &rebuildLimiterStub{},
|
||||
FillPercent: 80,
|
||||
MetaStorage: metaStub,
|
||||
Limiter: limiter,
|
||||
FillPercent: 80,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(49), rRes.FilesRemoved)
|
||||
|
@ -285,6 +294,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -338,9 +348,10 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
|||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
limiter := &rebuildLimiterStub{}
|
||||
var rPrm common.RebuildPrm
|
||||
rPrm.MetaStorage = metaStub
|
||||
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||
rPrm.Limiter = limiter
|
||||
rPrm.FillPercent = 1
|
||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -356,6 +367,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
}
|
||||
|
||||
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
||||
|
@ -427,9 +439,10 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
|
|||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
limiter := &rebuildLimiterStub{}
|
||||
var rPrm common.RebuildPrm
|
||||
rPrm.MetaStorage = metaStub
|
||||
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||
rPrm.Limiter = limiter
|
||||
rPrm.FillPercent = 1
|
||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -445,6 +458,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
|
|||
}
|
||||
|
||||
require.NoError(t, b.Close(context.Background()))
|
||||
require.NoError(t, limiter.ValidateReleased())
|
||||
}
|
||||
|
||||
type storageIDUpdateStub struct {
|
||||
|
@ -462,7 +476,36 @@ func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Addr
|
|||
return nil
|
||||
}
|
||||
|
||||
type rebuildLimiterStub struct{}
|
||||
type rebuildLimiterStub struct {
|
||||
slots atomic.Int64
|
||||
readRequests atomic.Int64
|
||||
writeRequests atomic.Int64
|
||||
}
|
||||
|
||||
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
|
||||
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}
|
||||
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
|
||||
s.slots.Add(1)
|
||||
return func() { s.slots.Add(-1) }, nil
|
||||
}
|
||||
|
||||
func (s *rebuildLimiterStub) ReadRequest(context.Context) (common.ReleaseFunc, error) {
|
||||
s.readRequests.Add(1)
|
||||
return func() { s.readRequests.Add(-1) }, nil
|
||||
}
|
||||
|
||||
func (s *rebuildLimiterStub) WriteRequest(context.Context) (common.ReleaseFunc, error) {
|
||||
s.writeRequests.Add(1)
|
||||
return func() { s.writeRequests.Add(-1) }, nil
|
||||
}
|
||||
|
||||
func (s *rebuildLimiterStub) ValidateReleased() error {
|
||||
if v := s.slots.Load(); v != 0 {
|
||||
return fmt.Errorf("invalid slots value %d", v)
|
||||
}
|
||||
if v := s.readRequests.Load(); v != 0 {
|
||||
return fmt.Errorf("invalid read requests value %d", v)
|
||||
}
|
||||
if v := s.writeRequests.Load(); v != 0 {
|
||||
return fmt.Errorf("invalid write requests value %d", v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -12,16 +12,27 @@ type RebuildRes struct {
|
|||
}
|
||||
|
||||
type RebuildPrm struct {
|
||||
MetaStorage MetaStorage
|
||||
WorkerLimiter ConcurrentWorkersLimiter
|
||||
FillPercent int
|
||||
MetaStorage MetaStorage
|
||||
Limiter RebuildLimiter
|
||||
FillPercent int
|
||||
}
|
||||
|
||||
type MetaStorage interface {
|
||||
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||
}
|
||||
|
||||
type ConcurrentWorkersLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) error
|
||||
ReleaseWorkSlot()
|
||||
type ReleaseFunc func()
|
||||
|
||||
type ConcurrencyLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
|
||||
}
|
||||
|
||||
type RateLimiter interface {
|
||||
ReadRequest(context.Context) (ReleaseFunc, error)
|
||||
WriteRequest(context.Context) (ReleaseFunc, error)
|
||||
}
|
||||
|
||||
type RebuildLimiter interface {
|
||||
ConcurrencyLimiter
|
||||
RateLimiter
|
||||
}
|
||||
|
|
|
@ -13,19 +13,14 @@ type StorageIDUpdate interface {
|
|||
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||
}
|
||||
|
||||
type ConcurrentWorkersLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) error
|
||||
ReleaseWorkSlot()
|
||||
}
|
||||
|
||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error {
|
||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, concLimiter common.RebuildLimiter, fillPercent int) error {
|
||||
var summary common.RebuildRes
|
||||
var rErr error
|
||||
for _, storage := range b.storage {
|
||||
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
|
||||
MetaStorage: upd,
|
||||
WorkerLimiter: limiter,
|
||||
FillPercent: fillPercent,
|
||||
MetaStorage: upd,
|
||||
Limiter: concLimiter,
|
||||
FillPercent: fillPercent,
|
||||
})
|
||||
summary.FilesRemoved += res.FilesRemoved
|
||||
summary.ObjectsMoved += res.ObjectsMoved
|
||||
|
|
|
@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
|
|||
var csPrm shard.ContainerSizePrm
|
||||
csPrm.SetContainerID(prm.cnr)
|
||||
|
||||
csRes, err := sh.Shard.ContainerSize(csPrm)
|
||||
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(ctx, sh, "can't get container size", err,
|
||||
zap.Stringer("container_id", prm.cnr))
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -176,7 +177,10 @@ func (e *StorageEngine) reportShardError(
|
|||
}
|
||||
|
||||
func isLogical(err error) bool {
|
||||
return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||
return errors.As(err, &logicerr.Logical{}) ||
|
||||
errors.Is(err, context.Canceled) ||
|
||||
errors.Is(err, context.DeadlineExceeded) ||
|
||||
errors.As(err, new(*apistatus.ResourceExhausted))
|
||||
}
|
||||
|
||||
// Option represents StorageEngine's constructor option.
|
||||
|
|
|
@ -3,8 +3,10 @@ package engine
|
|||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
|
@ -90,6 +92,7 @@ func testGetDefaultShardOptions(t testing.TB) []shard.Option {
|
|||
),
|
||||
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
|
||||
shard.WithMetaBaseOptions(testGetDefaultMetabaseOptions(t)...),
|
||||
shard.WithLimiter(&testQoSLimiter{t: t}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,3 +154,26 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
|
|||
},
|
||||
}, smallFileStorage, largeFileStorage
|
||||
}
|
||||
|
||||
var _ qos.Limiter = (*testQoSLimiter)(nil)
|
||||
|
||||
type testQoSLimiter struct {
|
||||
t testing.TB
|
||||
read atomic.Int64
|
||||
write atomic.Int64
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) Close() {
|
||||
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
|
||||
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
|
||||
t.read.Add(1)
|
||||
return func() { t.read.Add(-1) }, nil
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
|
||||
t.write.Add(1)
|
||||
return func() { t.write.Add(-1) }, nil
|
||||
}
|
||||
|
|
|
@ -339,7 +339,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
|
|||
var drop []cid.ID
|
||||
for id := range idMap {
|
||||
prm.SetContainerID(id)
|
||||
s, err := sh.ContainerSize(prm)
|
||||
s, err := sh.ContainerSize(ctx, prm)
|
||||
if err != nil {
|
||||
e.log.Warn(ctx, logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
||||
failed = true
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -41,7 +42,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
|
|||
}
|
||||
resGuard := &sync.Mutex{}
|
||||
|
||||
limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit)
|
||||
concLimiter := &concurrencyLimiter{semaphore: make(chan struct{}, prm.ConcurrencyLimit)}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for _, shardID := range prm.ShardIDs {
|
||||
|
@ -61,7 +62,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
|
|||
}
|
||||
|
||||
err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{
|
||||
ConcurrencyLimiter: limiter,
|
||||
ConcurrencyLimiter: concLimiter,
|
||||
TargetFillPercent: prm.TargetFillPercent,
|
||||
})
|
||||
|
||||
|
@ -88,3 +89,20 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
|
|||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
type concurrencyLimiter struct {
|
||||
semaphore chan struct{}
|
||||
}
|
||||
|
||||
func (l *concurrencyLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
|
||||
select {
|
||||
case l.semaphore <- struct{}{}:
|
||||
return l.releaseWorkSlot, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *concurrencyLimiter) releaseWorkSlot() {
|
||||
<-l.semaphore
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 {
|
|||
return r.size
|
||||
}
|
||||
|
||||
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||
func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
@ -34,6 +34,12 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
|||
return ContainerSizeRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return ContainerSizeRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||
if err != nil {
|
||||
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
|
||||
|
@ -69,6 +75,12 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
|
|||
return ContainerCountRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return ContainerCountRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
||||
if err != nil {
|
||||
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
|
||||
|
@ -100,6 +112,12 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
return s.metaBase.DeleteContainerSize(ctx, id)
|
||||
}
|
||||
|
||||
|
@ -122,5 +140,11 @@ func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
return s.metaBase.DeleteContainerCount(ctx, id)
|
||||
}
|
||||
|
|
|
@ -395,6 +395,10 @@ func (s *Shard) Close(ctx context.Context) error {
|
|||
s.gc.stop(ctx)
|
||||
}
|
||||
|
||||
if s.opsLimiter != nil {
|
||||
s.opsLimiter.Close()
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
|
@ -445,6 +449,10 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
if c.opsLimiter != nil {
|
||||
s.opsLimiter.Close()
|
||||
s.opsLimiter = c.opsLimiter
|
||||
}
|
||||
return s.setMode(ctx, c.info.Mode)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,12 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
|
|||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
cc, err := s.metaBase.ObjectCounters()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
|
|
@ -54,6 +54,12 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
|
|||
return DeleteRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return DeleteRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
result := DeleteRes{}
|
||||
for _, addr := range prm.addr {
|
||||
select {
|
||||
|
|
|
@ -53,10 +53,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
var exists bool
|
||||
var locked bool
|
||||
var err error
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
@ -64,7 +60,18 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
return ExistsRes{}, ErrShardDisabled
|
||||
} else if s.info.EvacuationInProgress {
|
||||
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
} else if s.info.Mode.NoMetabase() {
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return ExistsRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var exists bool
|
||||
var locked bool
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
var p common.ExistsPrm
|
||||
p.Address = prm.Address
|
||||
|
||||
|
|
|
@ -291,28 +291,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
|
|||
s.log.Debug(ctx, logs.ShardGCRemoveGarbageStarted)
|
||||
defer s.log.Debug(ctx, logs.ShardGCRemoveGarbageCompleted)
|
||||
|
||||
buf := make([]oid.Address, 0, s.rmBatchSize)
|
||||
|
||||
var iterPrm meta.GarbageIterationPrm
|
||||
iterPrm.SetHandler(func(g meta.GarbageObject) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
buf = append(buf, g.Address())
|
||||
|
||||
if len(buf) == s.rmBatchSize {
|
||||
return meta.ErrInterruptIterator
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// iterate over metabase's objects with GC mark
|
||||
// (no more than s.rmBatchSize objects)
|
||||
err := s.metaBase.IterateOverGarbage(ctx, iterPrm)
|
||||
buf, err := s.getGarbage(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardIteratorOverMetabaseGraveyardFailed,
|
||||
zap.Error(err),
|
||||
|
@ -344,6 +323,39 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *Shard) getGarbage(ctx context.Context) ([]oid.Address, error) {
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
buf := make([]oid.Address, 0, s.rmBatchSize)
|
||||
|
||||
var iterPrm meta.GarbageIterationPrm
|
||||
iterPrm.SetHandler(func(g meta.GarbageObject) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
buf = append(buf, g.Address())
|
||||
|
||||
if len(buf) == s.rmBatchSize {
|
||||
return meta.ErrInterruptIterator
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := s.metaBase.IterateOverGarbage(ctx, iterPrm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
|
||||
workerCount = max(minExpiredWorkers, s.gc.gcCfg.expiredCollectorWorkerCount)
|
||||
batchSize = max(minExpiredBatchSize, s.gc.gcCfg.expiredCollectorBatchSize)
|
||||
|
@ -422,18 +434,9 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
|||
return
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetAddresses(expired...)
|
||||
inhumePrm.SetGCMark()
|
||||
|
||||
// inhume the collected objects
|
||||
res, err := s.metaBase.Inhume(ctx, inhumePrm)
|
||||
res, err := s.inhumeGC(ctx, expired)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects,
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -451,6 +454,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
|||
}
|
||||
|
||||
func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) {
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
result := make([]oid.Address, 0, len(source))
|
||||
parentToChildren, err := s.metaBase.GetChildren(ctx, source)
|
||||
if err != nil {
|
||||
|
@ -464,6 +473,19 @@ func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address)
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeRes, error) {
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return meta.InhumeRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetAddresses(addrs...)
|
||||
inhumePrm.SetGCMark()
|
||||
return s.metaBase.Inhume(ctx, inhumePrm)
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||
var err error
|
||||
startedAt := time.Now()
|
||||
|
@ -505,11 +527,17 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
|||
return
|
||||
}
|
||||
|
||||
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
||||
s.m.RUnlock()
|
||||
return
|
||||
}
|
||||
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
|
||||
release()
|
||||
if err != nil {
|
||||
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
||||
s.m.RUnlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -598,7 +626,13 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
err = s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return meta.ErrInterruptIterator
|
||||
|
@ -621,6 +655,12 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
|
|||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
return s.metaBase.FilterExpired(ctx, epoch, addresses)
|
||||
}
|
||||
|
||||
|
@ -636,12 +676,15 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
|||
return
|
||||
}
|
||||
|
||||
res, err := s.metaBase.InhumeTombstones(ctx, tss)
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
|
||||
return
|
||||
}
|
||||
res, err := s.metaBase.InhumeTombstones(ctx, tss)
|
||||
release()
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -664,11 +707,16 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
|||
if s.GetMode().NoMetabase() {
|
||||
return
|
||||
}
|
||||
unlocked, err := s.metaBase.FreeLockedBy(lockers)
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
|
||||
zap.Error(err),
|
||||
)
|
||||
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
|
||||
return
|
||||
}
|
||||
unlocked, err := s.metaBase.FreeLockedBy(lockers)
|
||||
release()
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -676,13 +724,15 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
|||
var pInhume meta.InhumePrm
|
||||
pInhume.SetAddresses(lockers...)
|
||||
pInhume.SetForceGCMark()
|
||||
|
||||
res, err := s.metaBase.Inhume(ctx, pInhume)
|
||||
release, err = s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage,
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
|
||||
return
|
||||
}
|
||||
res, err := s.metaBase.Inhume(ctx, pInhume)
|
||||
release()
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -721,12 +771,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
|
|||
return
|
||||
}
|
||||
|
||||
_, err := s.metaBase.FreeLockedBy(lockers)
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
|
||||
return
|
||||
}
|
||||
_, err = s.metaBase.FreeLockedBy(lockers)
|
||||
release()
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -750,7 +803,13 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
|
|||
}
|
||||
|
||||
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||
return
|
||||
}
|
||||
ids, err := s.metaBase.ZeroSizeContainers(ctx)
|
||||
release()
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||
return
|
||||
|
@ -762,7 +821,13 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui
|
|||
}
|
||||
|
||||
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||
return
|
||||
}
|
||||
ids, err := s.metaBase.ZeroCountContainers(ctx)
|
||||
release()
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||
return
|
||||
|
|
|
@ -111,6 +111,12 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
return c.Get(ctx, prm.addr)
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return GetRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
|
||||
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)
|
||||
|
||||
|
|
|
@ -81,6 +81,12 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
|
|||
headParams.SetAddress(prm.addr)
|
||||
headParams.SetRaw(prm.raw)
|
||||
|
||||
release, limitErr := s.opsLimiter.ReadRequest(ctx)
|
||||
if limitErr != nil {
|
||||
return HeadRes{}, limitErr
|
||||
}
|
||||
defer release()
|
||||
|
||||
var res meta.GetRes
|
||||
res, err = s.metaBase.Get(ctx, headParams)
|
||||
obj = res.Header()
|
||||
|
|
|
@ -81,6 +81,12 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
return InhumeRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return InhumeRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
if s.hasWriteCache() {
|
||||
for i := range prm.target {
|
||||
_ = s.writeCache.Delete(ctx, prm.target[i])
|
||||
|
|
|
@ -106,6 +106,12 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
|
|||
return SelectRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return SelectRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
lst, err := s.metaBase.Containers(ctx)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("list stored containers: %w", err)
|
||||
|
@ -145,6 +151,12 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
|
|||
return ListContainersRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return ListContainersRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
containers, err := s.metaBase.Containers(ctx)
|
||||
if err != nil {
|
||||
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
|
||||
|
@ -173,6 +185,12 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
|||
return ListWithCursorRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return ListWithCursorRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var metaPrm meta.ListPrm
|
||||
metaPrm.SetCount(prm.count)
|
||||
metaPrm.SetCursor(prm.cursor)
|
||||
|
@ -202,9 +220,15 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var metaPrm meta.IterateOverContainersPrm
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||
err = s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("iterate over containers: %w", err)
|
||||
}
|
||||
|
@ -227,11 +251,17 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var metaPrm meta.IterateOverObjectsInContainerPrm
|
||||
metaPrm.ContainerID = prm.ContainerID
|
||||
metaPrm.ObjectType = prm.ObjectType
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||
err = s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("iterate over objects: %w", err)
|
||||
}
|
||||
|
@ -251,6 +281,12 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
|
|||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var metaPrm meta.CountAliveObjectsInContainerPrm
|
||||
metaPrm.ObjectType = prm.ObjectType
|
||||
metaPrm.ContainerID = prm.ContainerID
|
||||
|
|
|
@ -38,7 +38,13 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
err = s.metaBase.Lock(ctx, idCnr, locker, locked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("metabase lock: %w", err)
|
||||
}
|
||||
|
@ -61,6 +67,12 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
|
|||
return false, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
var prm meta.IsLockedPrm
|
||||
prm.SetAddress(addr)
|
||||
|
||||
|
@ -86,5 +98,12 @@ func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error
|
|||
if m.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
return s.metaBase.GetLocks(ctx, addr)
|
||||
}
|
||||
|
|
|
@ -67,6 +67,12 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
|
||||
var res common.PutRes
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return PutRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
// exist check are not performed there, these checks should be executed
|
||||
// ahead of `Put` by storage engine
|
||||
tryCache := s.hasWriteCache() && !m.NoMetabase()
|
||||
|
|
|
@ -131,6 +131,12 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
|
|||
return obj, nil
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return RngRes{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
|
||||
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -20,37 +21,9 @@ import (
|
|||
|
||||
var ErrRebuildInProgress = errors.New("shard rebuild in progress")
|
||||
|
||||
type RebuildWorkerLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) error
|
||||
ReleaseWorkSlot()
|
||||
}
|
||||
|
||||
type rebuildLimiter struct {
|
||||
semaphore chan struct{}
|
||||
}
|
||||
|
||||
func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter {
|
||||
return &rebuildLimiter{
|
||||
semaphore: make(chan struct{}, workersCount),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
|
||||
select {
|
||||
case l.semaphore <- struct{}{}:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *rebuildLimiter) ReleaseWorkSlot() {
|
||||
<-l.semaphore
|
||||
}
|
||||
|
||||
type rebuildTask struct {
|
||||
limiter RebuildWorkerLimiter
|
||||
fillPercent int
|
||||
concurrencyLimiter common.RebuildLimiter
|
||||
fillPercent int
|
||||
}
|
||||
|
||||
type rebuilder struct {
|
||||
|
@ -90,14 +63,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
|
|||
if !ok {
|
||||
continue
|
||||
}
|
||||
runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
|
||||
runRebuild(ctx, bs, mb, log, t.fillPercent, t.concurrencyLimiter)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
|
||||
fillPercent int, limiter RebuildWorkerLimiter,
|
||||
fillPercent int, concLimiter common.RebuildLimiter,
|
||||
) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -106,21 +79,21 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
|
|||
}
|
||||
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
|
||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
|
||||
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||
} else {
|
||||
log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int,
|
||||
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter common.RebuildLimiter, fillPercent int,
|
||||
) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case r.tasks <- rebuildTask{
|
||||
limiter: limiter,
|
||||
fillPercent: fillPercent,
|
||||
concurrencyLimiter: limiter,
|
||||
fillPercent: fillPercent,
|
||||
}:
|
||||
return nil
|
||||
default:
|
||||
|
@ -169,7 +142,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
|
|||
}
|
||||
|
||||
type RebuildPrm struct {
|
||||
ConcurrencyLimiter RebuildWorkerLimiter
|
||||
ConcurrencyLimiter common.ConcurrencyLimiter
|
||||
TargetFillPercent uint32
|
||||
}
|
||||
|
||||
|
@ -191,5 +164,30 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent))
|
||||
limiter := &rebuildLimiter{
|
||||
concurrencyLimiter: p.ConcurrencyLimiter,
|
||||
rateLimiter: s.opsLimiter,
|
||||
}
|
||||
return s.rb.ScheduleRebuild(ctx, limiter, int(p.TargetFillPercent))
|
||||
}
|
||||
|
||||
var _ common.RebuildLimiter = (*rebuildLimiter)(nil)
|
||||
|
||||
type rebuildLimiter struct {
|
||||
concurrencyLimiter common.ConcurrencyLimiter
|
||||
rateLimiter qos.Limiter
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I think rebuild test (and maybe all tests, acutally) should check that limiter is in some ground state after tests. I think rebuild test (and maybe all tests, acutally) should check that limiter is in some ground state after tests.
Current limiter API makes it possible to forget calling release.
WIth tests we can ensure that it is called at least in the code we have tested.
fyrchik
commented
Hm, may be we can do this in Hm, may be we can do this in `storageEngine.Close()`?
It is already called in `Cleanup()` and after all `wg.Wait()` I would expect limiter to be empty.
The problem is that there we have an interface, casting looks nice only in tests.
dstepanov-yadro
commented
Added validation to rebuild tests.
I hope that there will be no such cases, because the compiler should highlight this as an unused variable. Added validation to rebuild tests.
> Current limiter API makes it possible to forget calling release.
I hope that there will be no such cases, because the compiler should highlight this as an unused variable.
dstepanov-yadro
commented
Also added validation to engine tests. Also added validation to engine tests.
fyrchik
commented
Hm, good point >the compiler should highlight this as an unused variable
Hm, good point
|
||||
}
|
||||
|
||||
func (r *rebuildLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
|
||||
return r.concurrencyLimiter.AcquireWorkSlot(ctx)
|
||||
}
|
||||
|
||||
func (r *rebuildLimiter) ReadRequest(ctx context.Context) (common.ReleaseFunc, error) {
|
||||
release, err := r.rateLimiter.ReadRequest(ctx)
|
||||
return common.ReleaseFunc(release), err
|
||||
}
|
||||
|
||||
func (r *rebuildLimiter) WriteRequest(ctx context.Context) (common.ReleaseFunc, error) {
|
||||
release, err := r.rateLimiter.WriteRequest(ctx)
|
||||
return common.ReleaseFunc(release), err
|
||||
}
|
||||
|
|
|
@ -60,6 +60,12 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
|||
return SelectRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return SelectRes{}, nil
|
||||
}
|
||||
defer release()
|
||||
|
||||
var selectPrm meta.SelectPrm
|
||||
selectPrm.SetFilters(prm.filters)
|
||||
selectPrm.SetContainerID(prm.cnr)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -98,6 +99,8 @@ type cfg struct {
|
|||
reportErrorFunc func(ctx context.Context, selfID string, message string, err error)
|
||||
|
||||
containerInfo container.InfoProvider
|
||||
|
||||
opsLimiter qos.Limiter
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -109,6 +112,7 @@ func defaultCfg() *cfg {
|
|||
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
||||
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
||||
metricsWriter: noopMetrics{},
|
||||
opsLimiter: qos.NewNoopLimiter(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -368,6 +372,12 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithLimiter(l qos.Limiter) Option {
|
||||
return func(c *cfg) {
|
||||
c.opsLimiter = l
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) fillInfo() {
|
||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||
|
|
|
@ -43,6 +43,11 @@ func (s *Shard) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID s
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeMove(ctx, d, treeID, m)
|
||||
}
|
||||
|
||||
|
@ -75,6 +80,11 @@ func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, tre
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeAddByPath(ctx, d, treeID, attr, path, meta)
|
||||
}
|
||||
|
||||
|
@ -103,6 +113,11 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
||||
}
|
||||
|
||||
|
@ -130,6 +145,11 @@ func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
|
||||
}
|
||||
|
||||
|
@ -157,6 +177,11 @@ func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string,
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
|
||||
}
|
||||
|
||||
|
@ -182,6 +207,11 @@ func (s *Shard) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, n
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return pilorama.Meta{}, 0, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return pilorama.Meta{}, 0, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeGetMeta(ctx, cid, treeID, nodeID)
|
||||
}
|
||||
|
||||
|
@ -207,6 +237,11 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID)
|
||||
}
|
||||
|
||||
|
@ -231,6 +266,11 @@ func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, last, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, last, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
|
||||
}
|
||||
|
||||
|
@ -256,6 +296,11 @@ func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string,
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return pilorama.Move{}, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return pilorama.Move{}, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeGetOpLog(ctx, cid, treeID, height)
|
||||
}
|
||||
|
||||
|
@ -280,6 +325,11 @@ func (s *Shard) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) erro
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeDrop(ctx, cid, treeID)
|
||||
}
|
||||
|
||||
|
@ -303,6 +353,11 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeList(ctx, cid)
|
||||
}
|
||||
|
||||
|
@ -326,6 +381,11 @@ func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (u
|
|||
if s.pilorama == nil {
|
||||
return 0, ErrPiloramaDisabled
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeHeight(ctx, cid, treeID)
|
||||
}
|
||||
|
||||
|
@ -350,6 +410,11 @@ func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (b
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return false, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeExists(ctx, cid, treeID)
|
||||
}
|
||||
|
||||
|
@ -378,6 +443,11 @@ func (s *Shard) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, tre
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeUpdateLastSyncHeight(ctx, cid, treeID, height)
|
||||
}
|
||||
|
||||
|
@ -402,6 +472,11 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
|
||||
}
|
||||
|
||||
|
@ -423,6 +498,11 @@ func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeListTrees(ctx, prm)
|
||||
}
|
||||
|
||||
|
@ -452,5 +532,10 @@ func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID strin
|
|||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
|
||||
}
|
||||
|
|
|
@ -67,6 +67,12 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
|
||||
}
|
||||
|
||||
|
@ -124,6 +130,13 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
|||
close(started)
|
||||
defer cleanup()
|
||||
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
|
||||
s.log.Info(ctx, logs.StartedWritecacheSealAsync)
|
||||
if err := s.writeCache.Seal(ctx, prm); err != nil {
|
||||
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
|
||||
|
@ -138,5 +151,11 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
release, err := s.opsLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
|
||||
return s.writeCache.Seal(ctx, prm)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@ package object
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||
)
|
||||
|
@ -120,13 +122,24 @@ type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
|
|||
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
|
||||
s qosSendRecv[TReq, TResp]
|
||||
adj AdjustIOTag
|
||||
|
||||
ioTag string
|
||||
ioTagDefined bool
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
|
||||
if q.ioTagDefined {
|
||||
ctx = tagging.ContextWithIOTag(ctx, q.ioTag)
|
||||
}
|
||||
return q.s.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
if !q.ioTagDefined {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
q.ioTag, q.ioTagDefined = tagging.IOTagFromContext(ctx)
|
||||
}
|
||||
assert.True(q.ioTagDefined, "io tag undefined after incoming tag adjustment")
|
||||
ctx = tagging.ContextWithIOTag(ctx, q.ioTag)
|
||||
return q.s.Send(ctx, req)
|
||||
}
|
||||
|
|
You close a limiter of the new config is closed, not the old one. Is it right?
It is right. Performed refactoring: renamed
newConfig
->target
,oldConfig
->source
.