Support active RPC limiting #1639
26 changed files with 277 additions and 155 deletions
|
@ -29,6 +29,7 @@ import (
|
||||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
|
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
|
||||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
@ -69,6 +70,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -528,6 +530,8 @@ type cfgGRPC struct {
|
||||||
maxChunkSize uint64
|
maxChunkSize uint64
|
||||||
maxAddrAmount uint64
|
maxAddrAmount uint64
|
||||||
reconnectTimeout time.Duration
|
reconnectTimeout time.Duration
|
||||||
|
|
||||||
|
limiter atomic.Pointer[limiting.SemaphoreLimiter]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||||
|
@ -664,10 +668,6 @@ type cfgAccessPolicyEngine struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgObjectRoutines struct {
|
type cfgObjectRoutines struct {
|
||||||
putRemote *ants.Pool
|
|
||||||
|
|
||||||
putLocal *ants.Pool
|
|
||||||
|
|
||||||
replication *ants.Pool
|
replication *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -721,7 +721,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
||||||
|
|
||||||
c.cfgGRPC = initCfgGRPC()
|
c.cfgGRPC = initCfgGRPC(appCfg)
|
||||||
|
|
||||||
c.cfgMorph = cfgMorph{
|
c.cfgMorph = cfgMorph{
|
||||||
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
||||||
|
@ -852,14 +852,23 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initCfgGRPC() cfgGRPC {
|
func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) {
|
||||||
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||||
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||||
|
|
||||||
return cfgGRPC{
|
var limits []limiting.KeyLimit
|
||||||
maxChunkSize: maxChunkSize,
|
for _, l := range rpcconfig.Limits(appCfg) {
|
||||||
maxAddrAmount: maxAddrAmount,
|
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
cfg.maxChunkSize = maxChunkSize
|
||||||
|
cfg.maxAddrAmount = maxAddrAmount
|
||||||
|
cfg.limiter.Store(limiter)
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func initCfgObject(appCfg *config.Config) cfgObject {
|
func initCfgObject(appCfg *config.Config) cfgObject {
|
||||||
|
@ -1166,21 +1175,7 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
|
||||||
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
optNonBlocking := ants.WithNonblocking(true)
|
|
||||||
|
|
||||||
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
|
|
||||||
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
|
|
||||||
fatalOnErr(err)
|
|
||||||
|
|
||||||
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
|
|
||||||
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
|
|
||||||
fatalOnErr(err)
|
|
||||||
|
|
||||||
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
||||||
if replicatorPoolSize <= 0 {
|
|
||||||
replicatorPoolSize = putRemoteCapacity
|
|
||||||
}
|
|
||||||
|
|
||||||
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
@ -1410,17 +1405,28 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
|
||||||
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
components = append(components, dCmp{"rpc_limiter", c.reloadLimits})
|
||||||
|
|
||||||
return components
|
return components
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) reloadLimits() error {
|
||||||
|
var limits []limiting.KeyLimit
|
||||||
|
for _, l := range rpcconfig.Limits(c.appCfg) {
|
||||||
|
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||||
|
}
|
||||||
|
|
||||||
|
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.cfgGRPC.limiter.Store(limiter)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cfg) reloadPools() error {
|
func (c *cfg) reloadPools() error {
|
||||||
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
|
newSize := replicatorconfig.PoolSize(c.appCfg)
|
||||||
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
|
|
||||||
|
|
||||||
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
|
|
||||||
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
|
|
||||||
|
|
||||||
newSize = replicatorconfig.PoolSize(c.appCfg)
|
|
||||||
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -21,10 +21,6 @@ const (
|
||||||
|
|
||||||
putSubsection = "put"
|
putSubsection = "put"
|
||||||
getSubsection = "get"
|
getSubsection = "get"
|
||||||
|
|
||||||
// PutPoolSizeDefault is a default value of routine pool size to
|
|
||||||
// process object.Put requests in object service.
|
|
||||||
PutPoolSizeDefault = 10
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Put returns structure that provides access to "put" subsection of
|
// Put returns structure that provides access to "put" subsection of
|
||||||
|
@ -35,30 +31,6 @@ func Put(c *config.Config) PutConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolSizeRemote returns the value of "remote_pool_size" config parameter.
|
|
||||||
//
|
|
||||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
|
||||||
func (g PutConfig) PoolSizeRemote() int {
|
|
||||||
v := config.Int(g.cfg, "remote_pool_size")
|
|
||||||
if v > 0 {
|
|
||||||
return int(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return PutPoolSizeDefault
|
|
||||||
}
|
|
||||||
|
|
||||||
// PoolSizeLocal returns the value of "local_pool_size" config parameter.
|
|
||||||
//
|
|
||||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
|
||||||
func (g PutConfig) PoolSizeLocal() int {
|
|
||||||
v := config.Int(g.cfg, "local_pool_size")
|
|
||||||
if v > 0 {
|
|
||||||
return int(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return PutPoolSizeDefault
|
|
||||||
}
|
|
||||||
|
|
||||||
// SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `false“ if is not defined.
|
// SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `false“ if is not defined.
|
||||||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||||
|
|
|
@ -13,8 +13,6 @@ func TestObjectSection(t *testing.T) {
|
||||||
t.Run("defaults", func(t *testing.T) {
|
t.Run("defaults", func(t *testing.T) {
|
||||||
empty := configtest.EmptyConfig()
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
|
||||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
|
|
||||||
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
||||||
require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification())
|
require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification())
|
||||||
})
|
})
|
||||||
|
@ -22,8 +20,6 @@ func TestObjectSection(t *testing.T) {
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
fileConfigTest := func(c *config.Config) {
|
fileConfigTest := func(c *config.Config) {
|
||||||
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
|
||||||
require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal())
|
|
||||||
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
||||||
require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification())
|
require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification())
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,8 @@ const (
|
||||||
|
|
||||||
// PutTimeoutDefault is a default timeout of object put request in replicator.
|
// PutTimeoutDefault is a default timeout of object put request in replicator.
|
||||||
PutTimeoutDefault = 5 * time.Second
|
PutTimeoutDefault = 5 * time.Second
|
||||||
|
// PoolSizeDefault is a default pool size for put request in replicator.
|
||||||
|
PoolSizeDefault = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
// PutTimeout returns the value of "put_timeout" config parameter
|
// PutTimeout returns the value of "put_timeout" config parameter
|
||||||
|
@ -28,6 +30,13 @@ func PutTimeout(c *config.Config) time.Duration {
|
||||||
|
|
||||||
// PoolSize returns the value of "pool_size" config parameter
|
// PoolSize returns the value of "pool_size" config parameter
|
||||||
// from "replicator" section.
|
// from "replicator" section.
|
||||||
|
//
|
||||||
|
// Returns PoolSizeDefault if the value is non-positive integer.
|
||||||
func PoolSize(c *config.Config) int {
|
func PoolSize(c *config.Config) int {
|
||||||
return int(config.IntSafe(c.Sub(subsection), "pool_size"))
|
v := int(config.IntSafe(c.Sub(subsection), "pool_size"))
|
||||||
|
if v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
return PoolSizeDefault
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ func TestReplicatorSection(t *testing.T) {
|
||||||
empty := configtest.EmptyConfig()
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
require.Equal(t, replicatorconfig.PutTimeoutDefault, replicatorconfig.PutTimeout(empty))
|
require.Equal(t, replicatorconfig.PutTimeoutDefault, replicatorconfig.PutTimeout(empty))
|
||||||
require.Equal(t, 0, replicatorconfig.PoolSize(empty))
|
require.Equal(t, replicatorconfig.PoolSizeDefault, replicatorconfig.PoolSize(empty))
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
|
43
cmd/frostfs-node/config/rpc/config.go
Normal file
43
cmd/frostfs-node/config/rpc/config.go
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
package rpcconfig
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
subsection = "rpc"
|
||||||
|
limitsSubsection = "limits"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LimitConfig struct {
|
||||||
|
Methods []string
|
||||||
|
MaxOps int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limits returns the "limits" config from "rpc" section.
|
||||||
|
func Limits(c *config.Config) []LimitConfig {
|
||||||
|
c = c.Sub(subsection).Sub(limitsSubsection)
|
||||||
|
|
||||||
|
var limits []LimitConfig
|
||||||
|
|
||||||
|
for i := uint64(0); ; i++ {
|
||||||
|
si := strconv.FormatUint(i, 10)
|
||||||
|
sc := c.Sub(si)
|
||||||
|
|
||||||
|
methods := config.StringSliceSafe(sc, "methods")
|
||||||
|
if len(methods) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
maxOps := config.IntSafe(sc, "max_ops")
|
||||||
|
if maxOps == 0 {
|
||||||
|
panic("no max operations for method group")
|
||||||
|
}
|
||||||
|
|
||||||
|
limits = append(limits, LimitConfig{methods, maxOps})
|
||||||
|
}
|
||||||
|
|
||||||
|
return limits
|
||||||
|
}
|
53
cmd/frostfs-node/config/rpc/config_test.go
Normal file
53
cmd/frostfs-node/config/rpc/config_test.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package rpcconfig
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRPCSection(t *testing.T) {
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
require.Empty(t, Limits(configtest.EmptyConfig()))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("correct config", func(t *testing.T) {
|
||||||
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
limits := Limits(c)
|
||||||
|
require.Len(t, limits, 2)
|
||||||
|
|
||||||
|
limit0 := limits[0]
|
||||||
|
limit1 := limits[1]
|
||||||
|
|
||||||
|
require.ElementsMatch(t, limit0.Methods, []string{"/neo.fs.v2.object.ObjectService/PutSingle", "/neo.fs.v2.object.ObjectService/Put"})
|
||||||
|
require.Equal(t, limit0.MaxOps, int64(1000))
|
||||||
|
|
||||||
|
require.ElementsMatch(t, limit1.Methods, []string{"/neo.fs.v2.object.ObjectService/Get"})
|
||||||
|
require.Equal(t, limit1.MaxOps, int64(10000))
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("no max operations", func(t *testing.T) {
|
||||||
|
const path = "testdata/node"
|
||||||
|
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
require.Panics(t, func() { _ = Limits(c) })
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
3
cmd/frostfs-node/config/rpc/testdata/node.env
vendored
Normal file
3
cmd/frostfs-node/config/rpc/testdata/node.env
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
|
||||||
|
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
|
||||||
|
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
|
18
cmd/frostfs-node/config/rpc/testdata/node.json
vendored
Normal file
18
cmd/frostfs-node/config/rpc/testdata/node.json
vendored
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
{
|
||||||
|
"rpc": {
|
||||||
|
"limits": [
|
||||||
|
{
|
||||||
|
"methods": [
|
||||||
|
"/neo.fs.v2.object.ObjectService/PutSingle",
|
||||||
|
"/neo.fs.v2.object.ObjectService/Put"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"methods": [
|
||||||
|
"/neo.fs.v2.object.ObjectService/Get"
|
||||||
|
],
|
||||||
|
"max_ops": 10000
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
8
cmd/frostfs-node/config/rpc/testdata/node.yaml
vendored
Normal file
8
cmd/frostfs-node/config/rpc/testdata/node.yaml
vendored
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
rpc:
|
||||||
|
limits:
|
||||||
|
- methods:
|
||||||
|
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||||
|
- /neo.fs.v2.object.ObjectService/Put
|
||||||
|
- methods:
|
||||||
|
- /neo.fs.v2.object.ObjectService/Get
|
||||||
|
max_ops: 10000
|
|
@ -9,9 +9,11 @@ import (
|
||||||
|
|
||||||
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
|
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
@ -134,11 +136,13 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
|
||||||
qos.NewUnaryServerInterceptor(),
|
qos.NewUnaryServerInterceptor(),
|
||||||
metrics.NewUnaryServerInterceptor(),
|
metrics.NewUnaryServerInterceptor(),
|
||||||
tracing.NewUnaryServerInterceptor(),
|
tracing.NewUnaryServerInterceptor(),
|
||||||
|
qosInternal.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
|
||||||
),
|
),
|
||||||
grpc.ChainStreamInterceptor(
|
grpc.ChainStreamInterceptor(
|
||||||
qos.NewStreamServerInterceptor(),
|
qos.NewStreamServerInterceptor(),
|
||||||
metrics.NewStreamServerInterceptor(),
|
metrics.NewStreamServerInterceptor(),
|
||||||
tracing.NewStreamServerInterceptor(),
|
tracing.NewStreamServerInterceptor(),
|
||||||
|
qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,7 +326,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
||||||
c,
|
c,
|
||||||
c.cfgNetmap.state,
|
c.cfgNetmap.state,
|
||||||
irFetcher,
|
irFetcher,
|
||||||
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
|
||||||
objectwriter.WithLogger(c.log),
|
objectwriter.WithLogger(c.log),
|
||||||
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||||
)
|
)
|
||||||
|
|
|
@ -87,12 +87,15 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||||
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
|
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
|
||||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||||
|
|
||||||
|
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
|
||||||
|
FROSTFS_RPC_LIMITS_0_MAX_OPS=1000
|
||||||
|
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
|
||||||
|
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||||
|
|
|
@ -134,14 +134,29 @@
|
||||||
"tombstone_lifetime": 10
|
"tombstone_lifetime": 10
|
||||||
},
|
},
|
||||||
"put": {
|
"put": {
|
||||||
"remote_pool_size": 100,
|
|
||||||
"local_pool_size": 200,
|
|
||||||
"skip_session_token_issuer_verification": true
|
"skip_session_token_issuer_verification": true
|
||||||
},
|
},
|
||||||
"get": {
|
"get": {
|
||||||
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"rpc": {
|
||||||
|
"limits": [
|
||||||
|
{
|
||||||
|
"methods": [
|
||||||
|
"/neo.fs.v2.object.ObjectService/PutSingle",
|
||||||
|
"/neo.fs.v2.object.ObjectService/Put"
|
||||||
|
],
|
||||||
|
"max_ops": 1000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"methods": [
|
||||||
|
"/neo.fs.v2.object.ObjectService/Get"
|
||||||
|
],
|
||||||
|
"max_ops": 10000
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
"shard_pool_size": 15,
|
"shard_pool_size": 15,
|
||||||
"shard_ro_error_threshold": 100,
|
"shard_ro_error_threshold": 100,
|
||||||
|
|
|
@ -117,14 +117,22 @@ object:
|
||||||
delete:
|
delete:
|
||||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||||
put:
|
put:
|
||||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
|
||||||
local_pool_size: 200 # number of async workers for local PUT operations
|
|
||||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||||
get:
|
get:
|
||||||
priority: # list of metrics of nodes for prioritization
|
priority: # list of metrics of nodes for prioritization
|
||||||
- $attribute:ClusterName
|
- $attribute:ClusterName
|
||||||
- $attribute:UN-LOCODE
|
- $attribute:UN-LOCODE
|
||||||
|
|
||||||
|
rpc:
|
||||||
|
limits:
|
||||||
|
- methods:
|
||||||
|
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||||
|
- /neo.fs.v2.object.ObjectService/Put
|
||||||
|
max_ops: 1000
|
||||||
|
- methods:
|
||||||
|
- /neo.fs.v2.object.ObjectService/Get
|
||||||
|
max_ops: 10000
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
|
|
|
@ -396,18 +396,16 @@ replicator:
|
||||||
pool_size: 10
|
pool_size: 10
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|---------------|------------|----------------------------------------|---------------------------------------------|
|
|---------------|------------|---------------|---------------------------------------------|
|
||||||
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
|
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
|
||||||
| `pool_size` | `int` | Equal to `object.put.remote_pool_size` | Maximum amount of concurrent replications. |
|
| `pool_size` | `int` | `10` | Maximum amount of concurrent replications. |
|
||||||
|
|
||||||
# `object` section
|
# `object` section
|
||||||
Contains object-service related parameters.
|
Contains object-service related parameters.
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
object:
|
object:
|
||||||
put:
|
|
||||||
remote_pool_size: 100
|
|
||||||
get:
|
get:
|
||||||
priority:
|
priority:
|
||||||
- $attribute:ClusterName
|
- $attribute:ClusterName
|
||||||
|
@ -416,10 +414,29 @@ object:
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
|
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
|
||||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
|
||||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
|
||||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
|
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
|
||||||
|
|
||||||
|
|
||||||
|
# `rpc` section
|
||||||
|
Contains limits on the number of active RPC for specified method(s).
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
rpc:
|
||||||
|
limits:
|
||||||
|
- methods:
|
||||||
|
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||||
|
- /neo.fs.v2.object.ObjectService/Put
|
||||||
|
max_ops: 1000
|
||||||
|
- methods:
|
||||||
|
- /neo.fs.v2.object.ObjectService/Get
|
||||||
|
max_ops: 10000
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | Default value | Description |
|
||||||
|
|------------------|------------|---------------|--------------------------------------------------------------|
|
||||||
|
| `limits.max_ops` | `int` | | Maximum number of active RPC allowed for the given method(s) |
|
||||||
|
| `limits.methods` | `[]string` | | List of RPC methods sharing the given limit |
|
||||||
|
|
||||||
# `runtime` section
|
# `runtime` section
|
||||||
Contains runtime parameters.
|
Contains runtime parameters.
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
|
|
|
@ -125,7 +125,6 @@ const (
|
||||||
SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers"
|
SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers"
|
||||||
SearchLocalOperationFailed = "local operation failed"
|
SearchLocalOperationFailed = "local operation failed"
|
||||||
UtilObjectServiceError = "object service error"
|
UtilObjectServiceError = "object service error"
|
||||||
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
|
|
||||||
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
||||||
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
||||||
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
||||||
|
|
|
@ -3,7 +3,9 @@ package qos
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -49,3 +51,36 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto
|
||||||
return streamer(ctx, desc, cc, method, opts...)
|
return streamer(ctx, desc, cc, method, opts...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewMaxActiveRPCLimiterUnaryServerInterceptor(getLimiter func() limiting.Limiter) grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
|
||||||
|
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() {
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
release, ok := getLimiter().Acquire(info.FullMethod)
|
||||||
|
if !ok {
|
||||||
|
return nil, new(apistatus.ResourceExhausted)
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//nolint:contextcheck (grpc.ServerStream manages the context itself)
|
||||||
|
func NewMaxActiveRPCLimiterStreamServerInterceptor(getLimiter func() limiting.Limiter) grpc.StreamServerInterceptor {
|
||||||
|
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||||
|
if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() {
|
||||||
|
return handler(srv, ss)
|
||||||
|
}
|
||||||
|
|
||||||
|
release, ok := getLimiter().Acquire(info.FullMethod)
|
||||||
|
if !ok {
|
||||||
|
return new(apistatus.ResourceExhausted)
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
|
||||||
|
return handler(srv, ss)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey())
|
isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey())
|
||||||
|
|
||||||
item := new(bool)
|
item := new(bool)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
if err := workerPool.Submit(func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
||||||
|
@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
||||||
|
|
||||||
traverser.SubmitSuccess()
|
traverser.SubmitSuccess()
|
||||||
*item = true
|
*item = true
|
||||||
}); err != nil {
|
}()
|
||||||
wg.Done()
|
|
||||||
svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark the container node as processed in order to exclude it
|
// Mark the container node as processed in order to exclude it
|
||||||
// in subsequent container broadcast. Note that we don't
|
// in subsequent container broadcast. Note that we don't
|
||||||
|
|
|
@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
||||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
completed := make(chan interface{})
|
err = e.Relay(ctx, info, c)
|
||||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
|
||||||
defer close(completed)
|
|
||||||
err = e.Relay(ctx, info, c)
|
|
||||||
}); poolErr != nil {
|
|
||||||
close(completed)
|
|
||||||
svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr)
|
|
||||||
return poolErr
|
|
||||||
}
|
|
||||||
<-completed
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
var err error
|
|
||||||
localTarget := LocalTarget{
|
localTarget := LocalTarget{
|
||||||
Storage: e.Config.LocalStore,
|
Storage: e.Config.LocalStore,
|
||||||
Container: e.Container,
|
Container: e.Container,
|
||||||
}
|
}
|
||||||
completed := make(chan interface{})
|
return localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
|
||||||
defer close(completed)
|
|
||||||
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
|
||||||
}); poolErr != nil {
|
|
||||||
close(completed)
|
|
||||||
return poolErr
|
|
||||||
}
|
|
||||||
<-completed
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||||
|
@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
||||||
nodeInfo: clientNodeInfo,
|
nodeInfo: clientNodeInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||||
completed := make(chan interface{})
|
|
||||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
|
||||||
defer close(completed)
|
|
||||||
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
|
||||||
}); poolErr != nil {
|
|
||||||
close(completed)
|
|
||||||
return poolErr
|
|
||||||
}
|
|
||||||
<-completed
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/panjf2000/ants/v2"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) {
|
||||||
nodeKey, err := keys.NewPrivateKey()
|
nodeKey, err := keys.NewPrivateKey()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
log, err := logger.NewLogger(nil)
|
log, err := logger.NewLogger(nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) {
|
||||||
ecw := ECWriter{
|
ecw := ECWriter{
|
||||||
Config: &Config{
|
Config: &Config{
|
||||||
NetmapKeys: n,
|
NetmapKeys: n,
|
||||||
RemotePool: pool,
|
|
||||||
Logger: log,
|
Logger: log,
|
||||||
ClientConstructor: clientConstructor{vectors: ns},
|
ClientConstructor: clientConstructor{vectors: ns},
|
||||||
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -52,8 +51,6 @@ type Config struct {
|
||||||
|
|
||||||
NetmapSource netmap.Source
|
NetmapSource netmap.Source
|
||||||
|
|
||||||
RemotePool, LocalPool util.WorkerPool
|
|
||||||
|
|
||||||
NetmapKeys netmap.AnnouncedKeys
|
NetmapKeys netmap.AnnouncedKeys
|
||||||
|
|
||||||
FormatValidator *object.FormatValidator
|
FormatValidator *object.FormatValidator
|
||||||
|
@ -69,12 +66,6 @@ type Config struct {
|
||||||
|
|
||||||
type Option func(*Config)
|
type Option func(*Config)
|
||||||
|
|
||||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
c.RemotePool, c.LocalPool = remote, local
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLogger(l *logger.Logger) Option {
|
func WithLogger(l *logger.Logger) Option {
|
||||||
return func(c *Config) {
|
return func(c *Config) {
|
||||||
c.Logger = l
|
c.Logger = l
|
||||||
|
@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
|
||||||
if c.NetmapKeys.IsLocalKey(pub) {
|
|
||||||
return c.LocalPool, true
|
|
||||||
}
|
|
||||||
return c.RemotePool, false
|
|
||||||
}
|
|
||||||
|
|
||||||
type Params struct {
|
type Params struct {
|
||||||
Config *Config
|
Config *Config
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage,
|
||||||
opts ...objectwriter.Option,
|
opts ...objectwriter.Option,
|
||||||
) *Service {
|
) *Service {
|
||||||
c := &objectwriter.Config{
|
c := &objectwriter.Config{
|
||||||
RemotePool: util.NewPseudoWorkerPool(),
|
|
||||||
LocalPool: util.NewPseudoWorkerPool(),
|
|
||||||
Logger: logger.NewLoggerWrapper(zap.L()),
|
Logger: logger.NewLoggerWrapper(zap.L()),
|
||||||
KeyStorage: ks,
|
KeyStorage: ks,
|
||||||
ClientConstructor: cc,
|
ClientConstructor: cc,
|
||||||
|
|
|
@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
|
|
||||||
func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) {
|
|
||||||
l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool,
|
|
||||||
zap.String("request", req),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue