Shard OPS limiter #1636

Merged
dstepanov-yadro merged 12 commits from dstepanov-yadro/frostfs-node:feat/shard_io_limiter into master 2025-03-03 10:52:34 +00:00
43 changed files with 1405 additions and 192 deletions

View file

@ -33,6 +33,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
@ -135,6 +136,7 @@ type shardCfg struct {
refillMetabase bool
refillMetabaseWorkersCount int
mode shardmode.Mode
limiter qos.Limiter
metaCfg struct {
path string
@ -254,39 +256,42 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error {
var newConfig shardCfg
func (a *applicationConfiguration) updateShardConfig(c *config.Config, source *shardconfig.Config) error {
var target shardCfg
newConfig.refillMetabase = oldConfig.RefillMetabase()
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
newConfig.mode = oldConfig.Mode()
newConfig.compress = oldConfig.Compress()
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
target.refillMetabase = source.RefillMetabase()
target.refillMetabaseWorkersCount = source.RefillMetabaseWorkersCount()
target.mode = source.Mode()
target.compress = source.Compress()
target.estimateCompressibility = source.EstimateCompressibility()
target.estimateCompressibilityThreshold = source.EstimateCompressibilityThreshold()
target.uncompressableContentType = source.UncompressableContentTypes()
target.smallSizeObjectLimit = source.SmallSizeLimit()
a.setShardWriteCacheConfig(&newConfig, oldConfig)
a.setShardWriteCacheConfig(&target, source)
a.setShardPiloramaConfig(c, &newConfig, oldConfig)
a.setShardPiloramaConfig(c, &target, source)
if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil {
if err := a.setShardStorageConfig(&target, source); err != nil {
return err
}
a.setMetabaseConfig(&newConfig, oldConfig)
a.setMetabaseConfig(&target, source)
a.setGCConfig(&newConfig, oldConfig)
a.setGCConfig(&target, source)
if err := a.setLimiter(&target, source); err != nil {
return err
}
a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig)
a.EngineCfg.shards = append(a.EngineCfg.shards, target)
return nil
}
func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
writeCacheCfg := oldConfig.WriteCache()
func (a *applicationConfiguration) setShardWriteCacheConfig(target *shardCfg, source *shardconfig.Config) {
writeCacheCfg := source.WriteCache()
if writeCacheCfg.Enabled() {
wc := &newConfig.writecacheCfg
wc := &target.writecacheCfg
wc.enabled = true
wc.path = writeCacheCfg.Path()
@ -299,10 +304,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
}
}
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) {
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, target *shardCfg, source *shardconfig.Config) {
if config.BoolSafe(c.Sub("tree"), "enabled") {
piloramaCfg := oldConfig.Pilorama()
pr := &newConfig.piloramaCfg
piloramaCfg := source.Pilorama()
pr := &target.piloramaCfg
pr.enabled = true
pr.path = piloramaCfg.Path()
@ -313,8 +318,8 @@ func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newC
}
}
func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
blobStorCfg := oldConfig.BlobStor()
func (a *applicationConfiguration) setShardStorageConfig(target *shardCfg, source *shardconfig.Config) error {
blobStorCfg := source.BlobStor()
storagesCfg := blobStorCfg.Storages()
ss := make([]subStorageCfg, 0, len(storagesCfg))
@ -348,13 +353,13 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
ss = append(ss, sCfg)
}
newConfig.subStorages = ss
target.subStorages = ss
return nil
}
func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
metabaseCfg := oldConfig.Metabase()
m := &newConfig.metaCfg
func (a *applicationConfiguration) setMetabaseConfig(target *shardCfg, source *shardconfig.Config) {
metabaseCfg := source.Metabase()
m := &target.metaCfg
m.path = metabaseCfg.Path()
m.perm = metabaseCfg.BoltDB().Perm()
@ -362,12 +367,25 @@ func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldCon
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
}
func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
gcCfg := oldConfig.GC()
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
func (a *applicationConfiguration) setGCConfig(target *shardCfg, source *shardconfig.Config) {
gcCfg := source.GC()
target.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
target.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
target.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
target.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
}
func (a *applicationConfiguration) setLimiter(target *shardCfg, source *shardconfig.Config) error {
limitsConfig := source.Limits()
limiter, err := qos.NewLimiter(limitsConfig)
if err != nil {
return err
}
if target.limiter != nil {
target.limiter.Close()
}
target.limiter = limiter
return nil
}
// internals contains application-specific internals that are created
@ -1054,6 +1072,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
return pool
}),
shard.WithLimiter(shCfg.limiter),
}
return sh
}

View file

@ -11,6 +11,7 @@ import (
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
@ -76,6 +77,7 @@ func TestEngineSection(t *testing.T) {
ss := blob.Storages()
pl := sc.Pilorama()
gc := sc.GC()
limits := sc.Limits()
switch num {
case 0:
@ -134,6 +136,75 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, sc.RefillMetabase())
require.Equal(t, mode.ReadOnly, sc.Mode())
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
readLimits := limits.Read()
writeLimits := limits.Write()
require.Equal(t, 30*time.Second, readLimits.IdleTimeout)
require.Equal(t, int64(10_000), readLimits.MaxRunningOps)
require.Equal(t, int64(1_000), readLimits.MaxWaitingOps)
require.Equal(t, 45*time.Second, writeLimits.IdleTimeout)
require.Equal(t, int64(1_000), writeLimits.MaxRunningOps)
require.Equal(t, int64(100), writeLimits.MaxWaitingOps)
require.ElementsMatch(t, readLimits.Tags,
[]limitsconfig.IOTagConfig{
{
Tag: "internal",
Weight: toPtr(20),
ReservedOps: toPtr(1000),
LimitOps: toPtr(0),
},
{
Tag: "client",
Weight: toPtr(70),
ReservedOps: toPtr(10000),
},
{
Tag: "background",
Weight: toPtr(5),
LimitOps: toPtr(10000),
ReservedOps: toPtr(0),
},
{
Tag: "writecache",
Weight: toPtr(5),
LimitOps: toPtr(25000),
},
{
Tag: "policer",
Weight: toPtr(5),
LimitOps: toPtr(25000),
},
})
require.ElementsMatch(t, writeLimits.Tags,
[]limitsconfig.IOTagConfig{
{
Tag: "internal",
Weight: toPtr(200),
ReservedOps: toPtr(100),
LimitOps: toPtr(0),
},
{
Tag: "client",
Weight: toPtr(700),
ReservedOps: toPtr(1000),
},
{
Tag: "background",
Weight: toPtr(50),
LimitOps: toPtr(1000),
ReservedOps: toPtr(0),
},
{
Tag: "writecache",
Weight: toPtr(50),
LimitOps: toPtr(2500),
},
{
Tag: "policer",
Weight: toPtr(50),
LimitOps: toPtr(2500),
},
})
case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
require.Equal(t, fs.FileMode(0o644), pl.Perm())
@ -188,6 +259,17 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, sc.RefillMetabase())
require.Equal(t, mode.ReadWrite, sc.Mode())
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
readLimits := limits.Read()
writeLimits := limits.Write()
require.Equal(t, limitsconfig.DefaultIdleTimeout, readLimits.IdleTimeout)
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxRunningOps)
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxWaitingOps)
require.Equal(t, limitsconfig.DefaultIdleTimeout, writeLimits.IdleTimeout)
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxRunningOps)
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxWaitingOps)
require.Equal(t, 0, len(readLimits.Tags))
require.Equal(t, 0, len(writeLimits.Tags))
}
return nil
})
@ -201,3 +283,7 @@ func TestEngineSection(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}
func toPtr(v float64) *float64 {
return &v
}

View file

@ -4,6 +4,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
blobstorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor"
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
metabaseconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/metabase"
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
@ -125,6 +126,14 @@ func (x *Config) GC() *gcconfig.Config {
)
}
// Limits returns "limits" subsection as a limitsconfig.Config.
func (x *Config) Limits() *limitsconfig.Config {
return limitsconfig.From(
(*config.Config)(x).
Sub("limits"),
)
}
// RefillMetabase returns the value of "resync_metabase" config parameter.
//
// Returns false if the value is not a valid bool.

View file

@ -0,0 +1,130 @@
package limits
import (
"math"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"github.com/spf13/cast"
)
const (
NoLimit int64 = math.MaxInt64
DefaultIdleTimeout = 5 * time.Minute
)
// From wraps config section into Config.
func From(c *config.Config) *Config {
return (*Config)(c)
}
// Config is a wrapper over the config section
// which provides access to Shard's limits configurations.
type Config config.Config
// Read returns the value of "read" limits config section.
func (x *Config) Read() OpConfig {
return x.parse("read")
}
// Write returns the value of "write" limits config section.
func (x *Config) Write() OpConfig {
return x.parse("write")
}
func (x *Config) parse(sub string) OpConfig {
c := (*config.Config)(x).Sub(sub)
var result OpConfig
if s := config.Int(c, "max_waiting_ops"); s > 0 {
result.MaxWaitingOps = s
} else {
result.MaxWaitingOps = NoLimit
}
if s := config.Int(c, "max_running_ops"); s > 0 {
result.MaxRunningOps = s
} else {
result.MaxRunningOps = NoLimit
}
if s := config.DurationSafe(c, "idle_timeout"); s > 0 {
result.IdleTimeout = s
} else {
result.IdleTimeout = DefaultIdleTimeout
}
result.Tags = tags(c)
return result
}
type OpConfig struct {
// MaxWaitingOps returns the value of "max_waiting_ops" config parameter.
//
// Equals NoLimit if the value is not a positive number.
MaxWaitingOps int64
// MaxRunningOps returns the value of "max_running_ops" config parameter.
//
// Equals NoLimit if the value is not a positive number.
MaxRunningOps int64
// IdleTimeout returns the value of "idle_timeout" config parameter.
//
// Equals DefaultIdleTimeout if the value is not a valid duration.
IdleTimeout time.Duration
// Tags returns the value of "tags" config parameter.
//
// Equals nil if the value is not a valid tags config slice.
Tags []IOTagConfig
}
type IOTagConfig struct {
Tag string
Weight *float64
LimitOps *float64
ReservedOps *float64
}
func tags(c *config.Config) []IOTagConfig {
c = c.Sub("tags")
var result []IOTagConfig
for i := 0; ; i++ {
tag := config.String(c, strconv.Itoa(i)+".tag")
if tag == "" {
return result
}
var tagConfig IOTagConfig
tagConfig.Tag = tag
v := c.Value(strconv.Itoa(i) + ".weight")
if v != nil {
w, err := cast.ToFloat64E(v)
panicOnErr(err)
tagConfig.Weight = &w
}
v = c.Value(strconv.Itoa(i) + ".limit_ops")
if v != nil {
l, err := cast.ToFloat64E(v)
panicOnErr(err)
tagConfig.LimitOps = &l
}
v = c.Value(strconv.Itoa(i) + ".reserved_ops")
if v != nil {
r, err := cast.ToFloat64E(v)
panicOnErr(err)
tagConfig.ReservedOps = &r
}
result = append(result, tagConfig)
}
}
func panicOnErr(err error) {
if err != nil {
panic(err)
}
}

View file

@ -157,6 +157,47 @@ FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
#### Limit of concurrent workers collecting expired objects by the garbage collector
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15
#### Limits config
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_MAX_RUNNING_OPS=10000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_MAX_WAITING_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_MAX_RUNNING_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_MAX_WAITING_OPS=100
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_IDLE_TIMEOUT=45s
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_IDLE_TIMEOUT=30s
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_TAG=internal
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_WEIGHT=20
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_LIMIT_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_RESERVED_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_TAG=client
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_WEIGHT=70
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_RESERVED_OPS=10000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_TAG=background
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_LIMIT_OPS=10000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_RESERVED_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_TAG=writecache
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_RESERVED_OPS=100
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_TAG=client
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_WEIGHT=700
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_RESERVED_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_TAG=background
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_LIMIT_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_RESERVED_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_TAG=writecache
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_LIMIT_OPS=2500
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_LIMIT_OPS=2500
## 1 shard
### Flag to refill Metabase from BlobStor

View file

@ -221,6 +221,76 @@
"remover_sleep_interval": "2m",
"expired_collector_batch_size": 1500,
"expired_collector_worker_count": 15
},
"limits": {
"read": {
"max_running_ops": 10000,
"max_waiting_ops": 1000,
"idle_timeout": "30s",
"tags": [
{
"tag": "internal",
"weight": 20,
"limit_ops": 0,
"reserved_ops": 1000
},
{
"tag": "client",
"weight": 70,
"reserved_ops": 10000
},
{
"tag": "background",
"weight": 5,
"limit_ops": 10000,
"reserved_ops": 0
},
{
"tag": "writecache",
"weight": 5,
"limit_ops": 25000
},
{
"tag": "policer",
"weight": 5,
"limit_ops": 25000
}
]
},
"write": {
"max_running_ops": 1000,
"max_waiting_ops": 100,
"idle_timeout": "45s",
"tags": [
{
"tag": "internal",
"weight": 200,
"limit_ops": 0,
"reserved_ops": 100
},
{
"tag": "client",
"weight": 700,
"reserved_ops": 1000
},
{
"tag": "background",
"weight": 50,
"limit_ops": 1000,
"reserved_ops": 0
},
{
"tag": "writecache",
"weight": 50,
"limit_ops": 2500
},
{
"tag": "policer",
"weight": 50,
"limit_ops": 2500
}
]
}
}
},
"1": {

View file

@ -227,6 +227,52 @@ storage:
expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
limits:
read:
max_running_ops: 10000
max_waiting_ops: 1000
idle_timeout: 30s
tags:
- tag: internal
weight: 20
limit_ops: 0
reserved_ops: 1000
- tag: client
weight: 70
reserved_ops: 10000
- tag: background
weight: 5
limit_ops: 10000
reserved_ops: 0
- tag: writecache
weight: 5
limit_ops: 25000
- tag: policer
weight: 5
limit_ops: 25000
write:
max_running_ops: 1000
max_waiting_ops: 100
idle_timeout: 45s
tags:
- tag: internal
weight: 200
limit_ops: 0
reserved_ops: 100
- tag: client
weight: 700
reserved_ops: 1000
- tag: background
weight: 50
limit_ops: 1000
reserved_ops: 0
- tag: writecache
weight: 50
limit_ops: 2500
- tag: policer
weight: 50
limit_ops: 2500
1:
writecache:
path: tmp/1/cache # write-cache root directory

View file

@ -195,6 +195,7 @@ The following table describes configuration for each shard.
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
| `small_object_size` | `size` | `1M` | Maximum size of an object stored in blobovnicza tree. |
| `gc` | [GC config](#gc-subsection) | | GC configuration. |
| `limits` | [Shard limits config](#limits-subsection) | | Shard limits configuration. |
### `blobstor` subsection
@ -301,6 +302,64 @@ writecache:
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. |
### `limits` subsection
```yaml
limits:
max_read_running_ops: 10000
max_read_waiting_ops: 1000
max_write_running_ops: 1000
max_write_waiting_ops: 100
read:
- tag: internal
weight: 20
limit_ops: 0
reserved_ops: 1000
- tag: client
weight: 70
reserved_ops: 10000
- tag: background
weight: 5
limit_ops: 10000
reserved_ops: 0
- tag: writecache
weight: 5
limit_ops: 25000
- tag: policer
weight: 5
limit_ops: 25000
write:
- tag: internal
weight: 200
limit_ops: 0
reserved_ops: 100
- tag: client
weight: 700
reserved_ops: 1000
- tag: background
weight: 50
limit_ops: 1000
reserved_ops: 0
- tag: writecache
weight: 50
limit_ops: 2500
- tag: policer
weight: 50
limit_ops: 2500
```
| Parameter | Type | Default value | Description |
| ----------------------- | -------- | -------------- | --------------------------------------------------------------------------------------------------------------- |
| `max_read_running_ops` | `int` | 0 (no limit) | The maximum number of runnig read operations. |
| `max_read_waiting_ops` | `int` | 0 (no limit) | The maximum number of waiting read operations. |
| `max_write_running_ops` | `int` | 0 (no limit) | The maximum number of running write operations. |
| `max_write_waiting_ops` | `int` | 0 (no limit) | The maximum number of running write operations. |
| `read` | `[]tag` | empty | Array of shard read settings for tags. |
| `write` | `[]tag` | empty | Array of shard write settings for tags. |
| `tag.tag` | `string` | empty | Tag name. Allowed values: `client`, `internal`, `background`, `writecache`, `policer`. |
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
# `node` section

2
go.mod
View file

@ -8,7 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972

4
go.sum
View file

@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 h1:QnAt5b2R6+hQthMOIn5ECfLAlVD8IAE5JRm1NCCOmuE=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=

9
internal/assert/cond.go Normal file
View file

@ -0,0 +1,9 @@
package assert
import "strings"
func True(cond bool, details ...string) {
if !cond {
panic(strings.Join(details, " "))
}
}

146
internal/qos/limiter.go Normal file
View file

@ -0,0 +1,146 @@
package qos
import (
"context"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
const (
defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0
)
type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
Close()
}
type scheduler interface {
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
Close()
}
func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
return nil, err
}
read, write := c.Read(), c.Write()
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read())
if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err)
}
writeScheduler, err := createScheduler(c.Write())
if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err)
}
return &mClockLimiter{
readScheduler: readScheduler,
writeScheduler: writeScheduler,
}, nil
}
func createScheduler(config limits.OpConfig) (scheduler, error) {
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
return newSemaphoreScheduler(config.MaxRunningOps), nil
}
return scheduling.NewMClock(
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
converToSchedulingTags(config.Tags), config.IdleTimeout)
}
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
result := make(map[string]scheduling.TagInfo)
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
result[tag.String()] = scheduling.TagInfo{
Share: defaultShare,
}
}
for _, l := range limits {
v := result[l.Tag]
if l.Weight != nil && *l.Weight != 0 {
v.Share = *l.Weight
}
if l.LimitOps != nil && *l.LimitOps != 0 {
v.LimitIOPS = l.LimitOps
}
if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps
}
result[l.Tag] = v
}
return result
}
var (
_ Limiter = (*noopLimiter)(nil)
releaseStub ReleaseFunc = func() {}
noopLimiterInstance = &noopLimiter{}
)
func NewNoopLimiter() Limiter {
return &noopLimiter{}
}
type noopLimiter struct{}
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) Close() {}
var _ Limiter = (*mClockLimiter)(nil)
type mClockLimiter struct {
readScheduler scheduler
writeScheduler scheduler
}
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler)
}
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler)
}
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
}
if tag == IOTagCritical.String() {
return releaseStub, nil
}
rel, err := s.RequestArrival(ctx, tag)
if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) {
return nil, &apistatus.ResourceExhausted{}
}
return nil, err
}
return ReleaseFunc(rel), nil
}
func (n *mClockLimiter) Close() {
n.readScheduler.Close()
n.writeScheduler.Close()
}

39
internal/qos/semaphore.go Normal file
View file

@ -0,0 +1,39 @@
package qos
import (
"context"
"errors"
qosSemaphore "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
)
var (
_ scheduler = (*semaphore)(nil)
errSemaphoreLimitExceeded = errors.New("semaphore limit exceeded")
)
type semaphore struct {
s *qosSemaphore.Semaphore
}
func newSemaphoreScheduler(size int64) *semaphore {
return &semaphore{
s: qosSemaphore.NewSemaphore(size),
}
}
func (s *semaphore) Close() {}
func (s *semaphore) RequestArrival(ctx context.Context, _ string) (scheduling.ReleaseFunc, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if s.s.Acquire() {
return s.s.Release, nil
}
return nil, errSemaphoreLimitExceeded
}

101
internal/qos/validate.go Normal file
View file

@ -0,0 +1,101 @@
package qos
import (
"errors"
"fmt"
"math"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
)
var errWeightsMustBeSpecified = errors.New("invalid weights: weights must be specified for all tags or not specified for any")
type tagConfig struct {
Shares, Limit, Reserved *float64
}
func validateConfig(c *limits.Config) error {
if err := validateOpConfig(c.Read()); err != nil {
return fmt.Errorf("limits 'read' section validation error: %w", err)
}
if err := validateOpConfig(c.Write()); err != nil {
return fmt.Errorf("limits 'write' section validation error: %w", err)
}
return nil
}
func validateOpConfig(c limits.OpConfig) error {
if c.MaxRunningOps <= 0 {
return fmt.Errorf("invalid 'max_running_ops = %d': must be greater than zero", c.MaxRunningOps)
}
if c.MaxWaitingOps <= 0 {
return fmt.Errorf("invalid 'max_waiting_ops = %d': must be greater than zero", c.MaxWaitingOps)
}
if c.IdleTimeout <= 0 {
return fmt.Errorf("invalid 'idle_timeout = %s': must be greater than zero", c.IdleTimeout.String())
}
if err := validateTags(c.Tags); err != nil {
return fmt.Errorf("'tags' config section validation error: %w", err)
}
return nil
}
func validateTags(configTags []limits.IOTagConfig) error {
tags := map[IOTag]tagConfig{
IOTagClient: {},
IOTagInternal: {},
IOTagBackground: {},
IOTagWritecache: {},
IOTagPolicer: {},
}
for _, t := range configTags {
tag, err := FromRawString(t.Tag)
if err != nil {
return fmt.Errorf("invalid tag %s: %w", t.Tag, err)
}
if _, ok := tags[tag]; !ok {
return fmt.Errorf("tag %s is not configurable", t.Tag)
}
tags[tag] = tagConfig{
Shares: t.Weight,
Limit: t.LimitOps,
Reserved: t.ReservedOps,
}
}
idx := 0
var shares float64
for t, v := range tags {
if idx == 0 {
idx++
shares = float64Value(v.Shares)
} else if (shares != 0 && float64Value(v.Shares) == 0) || (shares == 0 && float64Value(v.Shares) != 0) {
return errWeightsMustBeSpecified
}
if float64Value(v.Shares) < 0 || math.IsNaN(float64Value(v.Shares)) {
return fmt.Errorf("invalid weight for tag %s: must be positive value", t.String())
}
if float64Value(v.Limit) < 0 || math.IsNaN(float64Value(v.Limit)) {
return fmt.Errorf("invalid limit_ops for tag %s: must be positive value", t.String())
}
if float64Value(v.Reserved) < 0 || math.IsNaN(float64Value(v.Reserved)) {
return fmt.Errorf("invalid reserved_ops for tag %s: must be positive value", t.String())
}
}
return nil
}
func float64Value(f *float64) float64 {
if f == nil {
return 0.0
}
return *f
}
func isNoop(read, write limits.OpConfig) bool {
return read.MaxRunningOps == limits.NoLimit &&
read.MaxWaitingOps == limits.NoLimit &&
write.MaxRunningOps == limits.NoLimit &&
write.MaxWaitingOps == limits.NoLimit &&
len(read.Tags) == 0 &&
len(write.Tags) == 0
}

View file

@ -50,7 +50,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
var res common.RebuildRes
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage, prm.Limiter)
res.ObjectsMoved += completedPreviosMoves
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
@ -79,7 +79,7 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.Limiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
@ -195,7 +195,7 @@ func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFil
return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
shDB := b.getBlobovnicza(ctx, path)
blz, err := shDB.Open(ctx)
if err != nil {
@ -212,7 +212,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
if err != nil {
return 0, err
}
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, concLimiter)
if err != nil {
return migratedObjects, err
}
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
}, nil
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.RebuildLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
@ -253,7 +253,12 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
})
for {
_, err := blz.Iterate(ctx, prm)
release, err := limiter.ReadRequest(ctx)
if err != nil {
return result.Load(), err
}
_, err = blz.Iterate(ctx, prm)
release()
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
@ -265,13 +270,19 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
release, err := limiter.AcquireWorkSlot(egCtx)
if err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
defer release()
moveRelease, err := limiter.WriteRequest(ctx)
if err != nil {
return err
}
err = b.moveObject(egCtx, blz, blzPath, addr, data, meta)
moveRelease()
if err == nil {
result.Add(1)
}
@ -359,7 +370,7 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
return b.dropDirectoryIfEmpty(filepath.Dir(path))
}
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage, rateLimiter common.RateLimiter) (uint64, error) {
var count uint64
var rebuildTempFilesToRemove []string
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
@ -372,13 +383,24 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
}
defer shDB.Close(ctx)
release, err := rateLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
incompletedMoves, err := blz.ListMoveInfo(ctx)
release()
if err != nil {
return true, err
}
for _, move := range incompletedMoves {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
release, err := rateLimiter.WriteRequest(ctx)
if err != nil {
return false, err
}
err = b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore)
release()
if err != nil {
return true, err
}
count++
@ -388,9 +410,14 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
return false, nil
})
for _, tmp := range rebuildTempFilesToRemove {
release, err := rateLimiter.WriteRequest(ctx)
if err != nil {
return count, err
}
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
}
release()
}
return count, err
}

View file

@ -161,16 +161,18 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 1,
MetaStorage: metaStub,
Limiter: limiter,
FillPercent: 1,
})
require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved)
require.Equal(t, uint64(0), rRes.FilesRemoved)
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open(context.Background()))

View file

@ -2,7 +2,9 @@ package blobovniczatree
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -76,10 +78,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 60,
MetaStorage: metaStub,
Limiter: limiter,
FillPercent: 60,
})
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -94,6 +97,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
t.Run("no rebuild single db", func(t *testing.T) {
@ -128,10 +132,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 90, // 64KB / 100KB = 64%
MetaStorage: metaStub,
Limiter: limiter,
FillPercent: 90, // 64KB / 100KB = 64%
})
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -146,6 +151,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
t.Run("rebuild by fill percent", func(t *testing.T) {
@ -193,10 +199,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 80,
MetaStorage: metaStub,
Limiter: limiter,
FillPercent: 80,
})
require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved)
@ -215,6 +222,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
t.Run("rebuild by overflow", func(t *testing.T) {
@ -266,10 +274,11 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Open(mode.ComponentReadWrite))
require.NoError(t, b.Init())
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 80,
MetaStorage: metaStub,
Limiter: limiter,
FillPercent: 80,
})
require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved)
@ -285,6 +294,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
}
@ -338,9 +348,10 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.Limiter = limiter
rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
@ -356,6 +367,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
@ -427,9 +439,10 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.Limiter = limiter
rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
@ -445,6 +458,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
}
type storageIDUpdateStub struct {
@ -462,7 +476,36 @@ func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Addr
return nil
}
type rebuildLimiterStub struct{}
type rebuildLimiterStub struct {
slots atomic.Int64
readRequests atomic.Int64
writeRequests atomic.Int64
}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
s.slots.Add(1)
return func() { s.slots.Add(-1) }, nil
}
func (s *rebuildLimiterStub) ReadRequest(context.Context) (common.ReleaseFunc, error) {
s.readRequests.Add(1)
return func() { s.readRequests.Add(-1) }, nil
}
func (s *rebuildLimiterStub) WriteRequest(context.Context) (common.ReleaseFunc, error) {
s.writeRequests.Add(1)
return func() { s.writeRequests.Add(-1) }, nil
}
func (s *rebuildLimiterStub) ValidateReleased() error {
if v := s.slots.Load(); v != 0 {
return fmt.Errorf("invalid slots value %d", v)
}
if v := s.readRequests.Load(); v != 0 {
return fmt.Errorf("invalid read requests value %d", v)
}
if v := s.writeRequests.Load(); v != 0 {
return fmt.Errorf("invalid write requests value %d", v)
}
return nil
}

View file

@ -12,16 +12,27 @@ type RebuildRes struct {
}
type RebuildPrm struct {
MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
FillPercent int
MetaStorage MetaStorage
Limiter RebuildLimiter
FillPercent int
}
type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
type ReleaseFunc func()
type ConcurrencyLimiter interface {
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
}
type RateLimiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
}
type RebuildLimiter interface {
ConcurrencyLimiter
RateLimiter
}

View file

@ -13,19 +13,14 @@ type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error {
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, concLimiter common.RebuildLimiter, fillPercent int) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
WorkerLimiter: limiter,
FillPercent: fillPercent,
MetaStorage: upd,
Limiter: concLimiter,
FillPercent: fillPercent,
})
summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved

View file

@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr)
csRes, err := sh.Shard.ContainerSize(csPrm)
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
if err != nil {
e.reportShardError(ctx, sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr))

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
@ -176,7 +177,10 @@ func (e *StorageEngine) reportShardError(
}
func isLogical(err error) bool {
return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
return errors.As(err, &logicerr.Logical{}) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, new(*apistatus.ResourceExhausted))
}
// Option represents StorageEngine's constructor option.

View file

@ -3,8 +3,10 @@ package engine
import (
"context"
"path/filepath"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -90,6 +92,7 @@ func testGetDefaultShardOptions(t testing.TB) []shard.Option {
),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
shard.WithMetaBaseOptions(testGetDefaultMetabaseOptions(t)...),
shard.WithLimiter(&testQoSLimiter{t: t}),
}
}
@ -151,3 +154,26 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
},
}, smallFileStorage, largeFileStorage
}
var _ qos.Limiter = (*testQoSLimiter)(nil)
type testQoSLimiter struct {
t testing.TB
read atomic.Int64
write atomic.Int64
}
func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
}
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
t.read.Add(1)
return func() { t.read.Add(-1) }, nil
}
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
t.write.Add(1)
return func() { t.write.Add(-1) }, nil
}

View file

@ -339,7 +339,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
var drop []cid.ID
for id := range idMap {
prm.SetContainerID(id)
s, err := sh.ContainerSize(prm)
s, err := sh.ContainerSize(ctx, prm)
if err != nil {
e.log.Warn(ctx, logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true

View file

@ -4,6 +4,7 @@ import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
@ -41,7 +42,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
resGuard := &sync.Mutex{}
limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit)
concLimiter := &concurrencyLimiter{semaphore: make(chan struct{}, prm.ConcurrencyLimit)}
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
@ -61,7 +62,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{
ConcurrencyLimiter: limiter,
ConcurrencyLimiter: concLimiter,
TargetFillPercent: prm.TargetFillPercent,
})
@ -88,3 +89,20 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
return res, nil
}
type concurrencyLimiter struct {
semaphore chan struct{}
}
func (l *concurrencyLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
select {
case l.semaphore <- struct{}{}:
return l.releaseWorkSlot, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (l *concurrencyLimiter) releaseWorkSlot() {
<-l.semaphore
}

View file

@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 {
return r.size
}
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) {
s.m.RLock()
defer s.m.RUnlock()
@ -34,6 +34,12 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
return ContainerSizeRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ContainerSizeRes{}, err
}
defer release()
size, err := s.metaBase.ContainerSize(prm.cnr)
if err != nil {
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
@ -69,6 +75,12 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
return ContainerCountRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ContainerCountRes{}, err
}
defer release()
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
if err != nil {
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
@ -100,6 +112,12 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.metaBase.DeleteContainerSize(ctx, id)
}
@ -122,5 +140,11 @@ func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.metaBase.DeleteContainerCount(ctx, id)
}

View file

@ -395,6 +395,10 @@ func (s *Shard) Close(ctx context.Context) error {
s.gc.stop(ctx)
}
if s.opsLimiter != nil {
s.opsLimiter.Close()
}
return lastErr
}
@ -445,6 +449,10 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
return err
}
}
if c.opsLimiter != nil {
s.opsLimiter.Close()
s.opsLimiter = c.opsLimiter
}
return s.setMode(ctx, c.info.Mode)
}

View file

@ -23,6 +23,12 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
return 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
cc, err := s.metaBase.ObjectCounters()
if err != nil {
return 0, err

View file

@ -54,6 +54,12 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
return DeleteRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return DeleteRes{}, err
}
defer release()
result := DeleteRes{}
for _, addr := range prm.addr {
select {

View file

@ -53,10 +53,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
))
defer span.End()
var exists bool
var locked bool
var err error
s.m.RLock()
defer s.m.RUnlock()
@ -64,7 +60,18 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() {
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ExistsRes{}, err
}
defer release()
var exists bool
var locked bool
if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.Address

View file

@ -291,28 +291,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
s.log.Debug(ctx, logs.ShardGCRemoveGarbageStarted)
defer s.log.Debug(ctx, logs.ShardGCRemoveGarbageCompleted)
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(ctx, iterPrm)
buf, err := s.getGarbage(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardIteratorOverMetabaseGraveyardFailed,
zap.Error(err),
@ -344,6 +323,39 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
return
}
func (s *Shard) getGarbage(ctx context.Context) ([]oid.Address, error) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
if err := s.metaBase.IterateOverGarbage(ctx, iterPrm); err != nil {
return nil, err
}
return buf, nil
}
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
workerCount = max(minExpiredWorkers, s.gc.gcCfg.expiredCollectorWorkerCount)
batchSize = max(minExpiredBatchSize, s.gc.gcCfg.expiredCollectorBatchSize)
@ -422,18 +434,9 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
inhumePrm.SetGCMark()
// inhume the collected objects
res, err := s.metaBase.Inhume(ctx, inhumePrm)
res, err := s.inhumeGC(ctx, expired)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err))
return
}
@ -451,6 +454,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
}
func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(ctx, source)
if err != nil {
@ -464,6 +473,19 @@ func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address)
return result, nil
}
func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeRes, error) {
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return meta.InhumeRes{}, err
}
defer release()
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(addrs...)
inhumePrm.SetGCMark()
return s.metaBase.Inhume(ctx, inhumePrm)
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
@ -505,11 +527,17 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return
}
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
return
}
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
release()
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
return
}
@ -598,7 +626,13 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo
return ErrDegradedMode
}
err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
err = s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
@ -621,6 +655,12 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.metaBase.FilterExpired(ctx, epoch, addresses)
}
@ -636,12 +676,15 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return
}
res, err := s.metaBase.InhumeTombstones(ctx, tss)
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
return
}
res, err := s.metaBase.InhumeTombstones(ctx, tss)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
return
}
@ -664,11 +707,16 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
if s.GetMode().NoMetabase() {
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
@ -676,13 +724,15 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
var pInhume meta.InhumePrm
pInhume.SetAddresses(lockers...)
pInhume.SetForceGCMark()
res, err := s.metaBase.Inhume(ctx, pInhume)
release, err = s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
return
}
res, err := s.metaBase.Inhume(ctx, pInhume)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
return
}
@ -721,12 +771,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
return
}
_, err := s.metaBase.FreeLockedBy(lockers)
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
_, err = s.metaBase.FreeLockedBy(lockers)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
}
@ -750,7 +803,13 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
}
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
ids, err := s.metaBase.ZeroSizeContainers(ctx)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
@ -762,7 +821,13 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui
}
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
ids, err := s.metaBase.ZeroCountContainers(ctx)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return

View file

@ -111,6 +111,12 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return c.Get(ctx, prm.addr)
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return GetRes{}, err
}
defer release()
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)

View file

@ -81,6 +81,12 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw)
release, limitErr := s.opsLimiter.ReadRequest(ctx)
if limitErr != nil {
return HeadRes{}, limitErr
}
defer release()
var res meta.GetRes
res, err = s.metaBase.Get(ctx, headParams)
obj = res.Header()

View file

@ -81,6 +81,12 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return InhumeRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return InhumeRes{}, err
}
defer release()
if s.hasWriteCache() {
for i := range prm.target {
_ = s.writeCache.Delete(ctx, prm.target[i])

View file

@ -106,6 +106,12 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
return SelectRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return SelectRes{}, err
}
defer release()
lst, err := s.metaBase.Containers(ctx)
if err != nil {
return res, fmt.Errorf("list stored containers: %w", err)
@ -145,6 +151,12 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
return ListContainersRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ListContainersRes{}, err
}
defer release()
containers, err := s.metaBase.Containers(ctx)
if err != nil {
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
@ -173,6 +185,12 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
return ListWithCursorRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ListWithCursorRes{}, err
}
defer release()
var metaPrm meta.ListPrm
metaPrm.SetCount(prm.count)
metaPrm.SetCursor(prm.cursor)
@ -202,9 +220,15 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
return ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
err = s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over containers: %w", err)
}
@ -227,11 +251,17 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
err = s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over objects: %w", err)
}
@ -251,6 +281,12 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
return 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
var metaPrm meta.CountAliveObjectsInContainerPrm
metaPrm.ObjectType = prm.ObjectType
metaPrm.ContainerID = prm.ContainerID

View file

@ -38,7 +38,13 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
return ErrDegradedMode
}
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
err = s.metaBase.Lock(ctx, idCnr, locker, locked)
if err != nil {
return fmt.Errorf("metabase lock: %w", err)
}
@ -61,6 +67,12 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return false, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
defer release()
var prm meta.IsLockedPrm
prm.SetAddress(addr)
@ -86,5 +98,12 @@ func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error
if m.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.metaBase.GetLocks(ctx, addr)
}

View file

@ -67,6 +67,12 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
var res common.PutRes
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return PutRes{}, err
}
defer release()
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()

View file

@ -131,6 +131,12 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
return obj, nil
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return RngRes{}, err
}
defer release()
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -20,37 +21,9 @@ import (
var ErrRebuildInProgress = errors.New("shard rebuild in progress")
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type rebuildLimiter struct {
semaphore chan struct{}
}
func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}
type rebuildTask struct {
limiter RebuildWorkerLimiter
fillPercent int
concurrencyLimiter common.RebuildLimiter
fillPercent int
}
type rebuilder struct {
@ -90,14 +63,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
if !ok {
continue
}
runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
runRebuild(ctx, bs, mb, log, t.fillPercent, t.concurrencyLimiter)
}
}
}()
}
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
fillPercent int, limiter RebuildWorkerLimiter,
fillPercent int, concLimiter common.RebuildLimiter,
) {
select {
case <-ctx.Done():
@ -106,21 +79,21 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
}
log.Info(ctx, logs.BlobstoreRebuildStarted)
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully)
}
}
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int,
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter common.RebuildLimiter, fillPercent int,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case r.tasks <- rebuildTask{
limiter: limiter,
fillPercent: fillPercent,
concurrencyLimiter: limiter,
fillPercent: fillPercent,
}:
return nil
default:
@ -169,7 +142,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
}
type RebuildPrm struct {
ConcurrencyLimiter RebuildWorkerLimiter
ConcurrencyLimiter common.ConcurrencyLimiter
TargetFillPercent uint32
}
@ -191,5 +164,30 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
return ErrDegradedMode
}
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent))
limiter := &rebuildLimiter{
concurrencyLimiter: p.ConcurrencyLimiter,
rateLimiter: s.opsLimiter,
}
return s.rb.ScheduleRebuild(ctx, limiter, int(p.TargetFillPercent))
}
var _ common.RebuildLimiter = (*rebuildLimiter)(nil)
type rebuildLimiter struct {
concurrencyLimiter common.ConcurrencyLimiter
rateLimiter qos.Limiter
}
func (r *rebuildLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
return r.concurrencyLimiter.AcquireWorkSlot(ctx)
}
func (r *rebuildLimiter) ReadRequest(ctx context.Context) (common.ReleaseFunc, error) {
release, err := r.rateLimiter.ReadRequest(ctx)
return common.ReleaseFunc(release), err
}
func (r *rebuildLimiter) WriteRequest(ctx context.Context) (common.ReleaseFunc, error) {
release, err := r.rateLimiter.WriteRequest(ctx)
return common.ReleaseFunc(release), err
}

View file

@ -60,6 +60,12 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
return SelectRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return SelectRes{}, nil
}
defer release()
var selectPrm meta.SelectPrm
selectPrm.SetFilters(prm.filters)
selectPrm.SetContainerID(prm.cnr)

View file

@ -7,6 +7,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -98,6 +99,8 @@ type cfg struct {
reportErrorFunc func(ctx context.Context, selfID string, message string, err error)
containerInfo container.InfoProvider
opsLimiter qos.Limiter
}
func defaultCfg() *cfg {
@ -109,6 +112,7 @@ func defaultCfg() *cfg {
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
metricsWriter: noopMetrics{},
opsLimiter: qos.NewNoopLimiter(),
}
}
@ -368,6 +372,12 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
}
}
func WithLimiter(l qos.Limiter) Option {
return func(c *cfg) {
c.opsLimiter = l
}
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -43,6 +43,11 @@ func (s *Shard) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID s
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeMove(ctx, d, treeID, m)
}
@ -75,6 +80,11 @@ func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, tre
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeAddByPath(ctx, d, treeID, attr, path, meta)
}
@ -103,6 +113,11 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
}
@ -130,6 +145,11 @@ func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
}
@ -157,6 +177,11 @@ func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string,
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
}
@ -182,6 +207,11 @@ func (s *Shard) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, n
if s.info.Mode.NoMetabase() {
return pilorama.Meta{}, 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return pilorama.Meta{}, 0, err
}
defer release()
return s.pilorama.TreeGetMeta(ctx, cid, treeID, nodeID)
}
@ -207,6 +237,11 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID)
}
@ -231,6 +266,11 @@ func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID
if s.info.Mode.NoMetabase() {
return nil, last, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, last, err
}
defer release()
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
}
@ -256,6 +296,11 @@ func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string,
if s.info.Mode.NoMetabase() {
return pilorama.Move{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return pilorama.Move{}, err
}
defer release()
return s.pilorama.TreeGetOpLog(ctx, cid, treeID, height)
}
@ -280,6 +325,11 @@ func (s *Shard) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) erro
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeDrop(ctx, cid, treeID)
}
@ -303,6 +353,11 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeList(ctx, cid)
}
@ -326,6 +381,11 @@ func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (u
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
return s.pilorama.TreeHeight(ctx, cid, treeID)
}
@ -350,6 +410,11 @@ func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (b
if s.info.Mode.NoMetabase() {
return false, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
defer release()
return s.pilorama.TreeExists(ctx, cid, treeID)
}
@ -378,6 +443,11 @@ func (s *Shard) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, tre
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeUpdateLastSyncHeight(ctx, cid, treeID, height)
}
@ -402,6 +472,11 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
}
@ -423,6 +498,11 @@ func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeListTrees(ctx, prm)
}
@ -452,5 +532,10 @@ func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID strin
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}

View file

@ -67,6 +67,12 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
}
@ -124,6 +130,13 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
close(started)
defer cleanup()
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
return
}
defer release()
s.log.Info(ctx, logs.StartedWritecacheSealAsync)
if err := s.writeCache.Seal(ctx, prm); err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
@ -138,5 +151,11 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
return nil
}
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.writeCache.Seal(ctx, prm)
}

View file

@ -3,6 +3,8 @@ package object
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
)
@ -120,13 +122,24 @@ type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
s qosSendRecv[TReq, TResp]
adj AdjustIOTag
ioTag string
ioTagDefined bool
}
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
if q.ioTagDefined {
ctx = tagging.ContextWithIOTag(ctx, q.ioTag)
}
return q.s.CloseAndRecv(ctx)
}
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
if !q.ioTagDefined {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
q.ioTag, q.ioTagDefined = tagging.IOTagFromContext(ctx)
}
assert.True(q.ioTagDefined, "io tag undefined after incoming tag adjustment")
ctx = tagging.ContextWithIOTag(ctx, q.ioTag)
return q.s.Send(ctx, req)
}