Compare commits

...

10 commits

Author SHA1 Message Date
b53afce771 [#xx] object: Sort nodes by priority metrics to compute GET/SEARCH requests
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-18 16:51:49 +03:00
c0a2f20eee [#1422] multinet: Add metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
2d064d0bd8 [#1422] morph: Resolve funlen linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
ef38420623 [#1422] ir: Add dialer source
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
f7caef355b [#1422] node: Use dialer source for morph
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
fbdfd503e4 [#1422] morph: Add dialer source support
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
67798bb50e [#1422] mod: Bump neoneo-go version
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
5b653aa65f [#1422] morph: Drop single client as not used
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
e314f328c4 [#1422] tree: Use dialer source for tree service connections
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
6c96cc2af6 [#1422] node: Use dialer source for SDK cache
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-17 13:15:08 +00:00
33 changed files with 651 additions and 117 deletions

View file

@ -72,6 +72,7 @@ $(BINS): $(DIRS) dep
@echo "⇒ Build $@" @echo "⇒ Build $@"
CGO_ENABLED=0 \ CGO_ENABLED=0 \
go build -v -trimpath \ go build -v -trimpath \
-gcflags "all=-N -l" \
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \ -ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
-o $@ ./cmd/$(notdir $@) -o $@ ./cmd/$(notdir $@)

View file

@ -48,6 +48,8 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("node.kludge_compatibility_mode", false) cfg.SetDefault("node.kludge_compatibility_mode", false)
cfg.SetDefault("audit.enabled", false) cfg.SetDefault("audit.enabled", false)
setMultinetDefaults(cfg)
} }
func setControlDefaults(cfg *viper.Viper) { func setControlDefaults(cfg *viper.Viper) {
@ -131,3 +133,11 @@ func setMorphDefaults(cfg *viper.Viper) {
cfg.SetDefault("morph.validators", []string{}) cfg.SetDefault("morph.validators", []string{})
cfg.SetDefault("morph.switch_interval", 2*time.Minute) cfg.SetDefault("morph.switch_interval", 2*time.Minute)
} }
func setMultinetDefaults(cfg *viper.Viper) {
cfg.SetDefault("multinet.enabled", false)
cfg.SetDefault("multinet.balancer", "")
cfg.SetDefault("multinet.restrict", false)
cfg.SetDefault("multinet.fallback_delay", "0s")
cfg.SetDefault("multinet.subnets", "")
}

View file

@ -58,6 +58,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@ -109,6 +110,7 @@ type applicationConfiguration struct {
ObjectCfg struct { ObjectCfg struct {
tombstoneLifetime uint64 tombstoneLifetime uint64
priorityMetrics []placement.Metric
} }
EngineCfg struct { EngineCfg struct {
@ -232,6 +234,11 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
// Object // Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c) a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
var pm []placement.Metric
for _, raw := range objectconfig.Get(c).Priority() {
pm = append(pm, placement.ParseMetric(raw))
}
a.ObjectCfg.priorityMetrics = pm
// Storage Engine // Storage Engine
@ -764,7 +771,9 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err) fatalOnErr(err)
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg)) nodeMetrics := metrics.NewNodeMetrics()
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
fatalOnErr(err) fatalOnErr(err)
cacheOpts := cache.ClientCacheOpts{ cacheOpts := cache.ClientCacheOpts{
@ -773,6 +782,7 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
Key: &key.PrivateKey, Key: &key.PrivateKey,
AllowExternal: apiclientconfig.AllowExternal(appCfg), AllowExternal: apiclientconfig.AllowExternal(appCfg),
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
DialerSource: ds,
} }
return shared{ return shared{
@ -784,17 +794,18 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
bgClientCache: cache.NewSDKClientCache(cacheOpts), bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate, persistate: persistate,
metricsCollector: metrics.NewNodeMetrics(), metricsCollector: nodeMetrics,
dialerSource: ds, dialerSource: ds,
} }
} }
func internalNetConfig(appCfg *config.Config) internalNet.Config { func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
result := internalNet.Config{ result := internalNet.Config{
Enabled: multinet.Enabled(appCfg), Enabled: multinet.Enabled(appCfg),
Balancer: multinet.Balancer(appCfg), Balancer: multinet.Balancer(appCfg),
Restrict: multinet.Restrict(appCfg), Restrict: multinet.Restrict(appCfg),
FallbackDelay: multinet.FallbackDelay(appCfg), FallbackDelay: multinet.FallbackDelay(appCfg),
Metrics: m,
} }
sn := multinet.Subnets(appCfg) sn := multinet.Subnets(appCfg)
for _, s := range sn { for _, s := range sn {
@ -1361,7 +1372,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
} }
} }
if err := c.dialerSource.Update(internalNetConfig(c.appCfg)); err != nil { if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err)) c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
return return
} }

View file

@ -10,10 +10,17 @@ type PutConfig struct {
cfg *config.Config cfg *config.Config
} }
// GetConfig is a wrapper over "get" config section which provides access
// to object get pipeline configuration of object service.
type GetConfig struct {
cfg *config.Config
}
const ( const (
subsection = "object" subsection = "object"
putSubsection = "put" putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to // PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service. // process object.Put requests in object service.
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
func (g PutConfig) SkipSessionTokenIssuerVerification() bool { func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification") return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
} }
// Get returns structure that provides access to "get" subsection of
// "object" section.
func Get(c *config.Config) GetConfig {
return GetConfig{
c.Sub(subsection).Sub(getSubsection),
}
}
// Priority returns the value of "priority" config parameter.
func (g GetConfig) Priority() []string {
return config.StringSliceSafe(g.cfg, "priority")
}

View file

@ -29,51 +29,12 @@ const (
) )
func initMorphComponents(ctx context.Context, c *cfg) { func initMorphComponents(ctx context.Context, c *cfg) {
addresses := morphconfig.RPCEndpoint(c.appCfg) initMorphClient(ctx, c)
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
// order of endpoints with the same priority.
rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
cli, err := client.New(ctx,
c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
)
if err != nil {
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
zap.Any("endpoints", addresses),
zap.String("error", err.Error()),
)
fatalOnErr(err)
}
c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
cli.Close()
})
if err := cli.SetGroupSignerScope(); err != nil {
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
}
c.cfgMorph.client = cli
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
lookupScriptHashesInNNS(c) // smart contract auto negotiation lookupScriptHashesInNNS(c) // smart contract auto negotiation
if c.cfgMorph.notaryEnabled { if c.cfgMorph.notaryEnabled {
err = c.cfgMorph.client.EnableNotarySupport( err := c.cfgMorph.client.EnableNotarySupport(
client.WithProxyContract( client.WithProxyContract(
c.cfgMorph.proxyScriptHash, c.cfgMorph.proxyScriptHash,
), ),
@ -111,6 +72,50 @@ func initMorphComponents(ctx context.Context, c *cfg) {
c.cfgNetmap.wrapper = wrap c.cfgNetmap.wrapper = wrap
} }
func initMorphClient(ctx context.Context, c *cfg) {
addresses := morphconfig.RPCEndpoint(c.appCfg)
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
// order of endpoints with the same priority.
rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
cli, err := client.New(ctx,
c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
client.WithDialerSource(c.dialerSource),
)
if err != nil {
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
zap.Any("endpoints", addresses),
zap.String("error", err.Error()),
)
fatalOnErr(err)
}
c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
cli.Close()
})
if err := cli.SetGroupSignerScope(); err != nil {
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
}
c.cfgMorph.client = cli
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
}
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) { func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
// skip notary deposit in non-notary environments // skip notary deposit in non-notary environments
if !c.cfgMorph.notaryEnabled { if !c.cfgMorph.notaryEnabled {

View file

@ -174,11 +174,13 @@ func initObjectService(c *cfg) {
sPutV2 := createPutSvcV2(sPut, keyStorage) sPutV2 := createPutSvcV2(sPut, keyStorage)
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
sSearchV2 := createSearchSvcV2(sSearch, keyStorage) sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
*c.cfgObject.getSvc = *sGet // need smth better *c.cfgObject.getSvc = *sGet // need smth better
@ -366,7 +368,10 @@ func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Servic
return patchsvc.NewService(sPut.Config, sGet) return patchsvc.NewService(sPut.Config, sGet)
} }
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, containerSource containercore.Source) *searchsvc.Service { func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache,
containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *searchsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
return searchsvc.New( return searchsvc.New(
@ -374,6 +379,8 @@ func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Trav
coreConstructor, coreConstructor,
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.WithoutSuccessTracking(), placement.WithoutSuccessTracking(),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
), ),
c.netMapSource, c.netMapSource,
keyStorage, keyStorage,
@ -389,6 +396,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache, coreConstructor *cache.ClientCache,
containerSource containercore.Source, containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *getsvc.Service { ) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
@ -398,6 +406,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls, ls,
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.SuccessAfter(1), placement.SuccessAfter(1),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
), ),
coreConstructor, coreConstructor,
containerSource, containerSource,

View file

@ -67,6 +67,7 @@ func initTreeService(c *cfg) {
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()), tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()), tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
tree.WithNetmapState(c.cfgNetmap.state), tree.WithNetmapState(c.cfgNetmap.state),
tree.WithDialerSource(c.dialerSource),
) )
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {

View file

@ -8,9 +8,11 @@ import (
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree" treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
) )
@ -30,6 +32,13 @@ func validateConfig(c *config.Config) error {
return fmt.Errorf("invalid logger destination: %w", err) return fmt.Errorf("invalid logger destination: %w", err)
} }
// validate priority metrics for GET and SEARCH requests
for _, raw := range objectconfig.Get(c).Priority() {
if err := placement.ValidateMetric(raw); err != nil {
return err
}
}
// shard configuration validation // shard configuration validation
shardNum := 0 shardNum := 0

View file

@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15

View file

@ -131,6 +131,9 @@
"remote_pool_size": 100, "remote_pool_size": 100,
"local_pool_size": 200, "local_pool_size": 200,
"skip_session_token_issuer_verification": true "skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
} }
}, },
"storage": { "storage": {

View file

@ -114,6 +114,10 @@ object:
remote_pool_size: 100 # number of async workers for remote PUT operations remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
storage: storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -407,13 +407,17 @@ Contains object-service related parameters.
object: object:
put: put:
remote_pool_size: 100 remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------| |-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | | `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. | | `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
# `runtime` section # `runtime` section
Contains runtime parameters. Contains runtime parameters.

2
go.mod
View file

@ -133,4 +133,4 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect rsc.io/tmplfunc v0.0.3 // indirect
) )
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07

4
go.sum
View file

@ -16,8 +16,8 @@ git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8l
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8= git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI= git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM= git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg= git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=

View file

@ -22,6 +22,7 @@ const (
grpcServerSubsystem = "grpc_server" grpcServerSubsystem = "grpc_server"
policerSubsystem = "policer" policerSubsystem = "policer"
commonCacheSubsystem = "common_cache" commonCacheSubsystem = "common_cache"
multinetSubsystem = "multinet"
successLabel = "success" successLabel = "success"
shardIDLabel = "shard_id" shardIDLabel = "shard_id"
@ -41,6 +42,7 @@ const (
endpointLabel = "endpoint" endpointLabel = "endpoint"
hitLabel = "hit" hitLabel = "hit"
cacheLabel = "cache" cacheLabel = "cache"
sourceIPLabel = "source_ip"
readWriteMode = "READ_WRITE" readWriteMode = "READ_WRITE"
readOnlyMode = "READ_ONLY" readOnlyMode = "READ_ONLY"

View file

@ -17,6 +17,7 @@ type InnerRingServiceMetrics struct {
eventDuration *prometheus.HistogramVec eventDuration *prometheus.HistogramVec
morphCacheMetrics *morphCacheMetrics morphCacheMetrics *morphCacheMetrics
logMetrics logger.LogMetrics logMetrics logger.LogMetrics
multinet *multinetMetrics
// nolint: unused // nolint: unused
appInfo *ApplicationInfo appInfo *ApplicationInfo
} }
@ -51,6 +52,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace), morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
appInfo: NewApplicationInfo(misc.Version), appInfo: NewApplicationInfo(misc.Version),
logMetrics: logger.NewLogMetrics(innerRingNamespace), logMetrics: logger.NewLogMetrics(innerRingNamespace),
multinet: newMultinetMetrics(innerRingNamespace),
} }
} }
@ -78,3 +80,7 @@ func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics { func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
return m.logMetrics return m.logMetrics
} }
func (m *InnerRingServiceMetrics) Multinet() MultinetMetrics {
return m.multinet
}

View file

@ -0,0 +1,35 @@
package metrics
import (
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type multinetMetrics struct {
dials *prometheus.GaugeVec
}
type MultinetMetrics interface {
Dial(sourceIP string, success bool)
}
func newMultinetMetrics(ns string) *multinetMetrics {
return &multinetMetrics{
dials: metrics.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: ns,
Subsystem: multinetSubsystem,
Name: "dial_count",
Help: "Dials count performed by multinet",
}, []string{sourceIPLabel, successLabel}),
}
}
func (m *multinetMetrics) Dial(sourceIP string, success bool) {
m.dials.With(prometheus.Labels{
sourceIPLabel: sourceIP,
successLabel: strconv.FormatBool(success),
}).Inc()
}

View file

@ -25,6 +25,7 @@ type NodeMetrics struct {
morphClient *morphClientMetrics morphClient *morphClientMetrics
morphCache *morphCacheMetrics morphCache *morphCacheMetrics
log logger.LogMetrics log logger.LogMetrics
multinet *multinetMetrics
// nolint: unused // nolint: unused
appInfo *ApplicationInfo appInfo *ApplicationInfo
} }
@ -53,6 +54,7 @@ func NewNodeMetrics() *NodeMetrics {
morphCache: newMorphCacheMetrics(namespace), morphCache: newMorphCacheMetrics(namespace),
log: logger.NewLogMetrics(namespace), log: logger.NewLogMetrics(namespace),
appInfo: NewApplicationInfo(misc.Version), appInfo: NewApplicationInfo(misc.Version),
multinet: newMultinetMetrics(namespace),
} }
} }
@ -120,3 +122,7 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
func (m *NodeMetrics) LogMetrics() logger.LogMetrics { func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
return m.log return m.log
} }
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
return m.multinet
}

View file

@ -7,6 +7,7 @@ import (
"slices" "slices"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/multinet" "git.frostfs.info/TrueCloudLab/multinet"
) )
@ -23,6 +24,7 @@ type Config struct {
Balancer string Balancer string
Restrict bool Restrict bool
FallbackDelay time.Duration FallbackDelay time.Duration
Metrics metrics.MultinetMetrics
} }
func (c Config) toMultinetConfig() (multinet.Config, error) { func (c Config) toMultinetConfig() (multinet.Config, error) {
@ -52,6 +54,7 @@ func (c Config) toMultinetConfig() (multinet.Config, error) {
Restrict: c.Restrict, Restrict: c.Restrict,
FallbackDelay: c.FallbackDelay, FallbackDelay: c.FallbackDelay,
Dialer: newDefaulDialer(), Dialer: newDefaulDialer(),
EventHandler: newEventHandler(c.Metrics),
}, nil }, nil
} }

View file

@ -13,6 +13,10 @@ type Dialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error) DialContext(ctx context.Context, network, address string) (net.Conn, error)
} }
func DialContextTCP(ctx context.Context, address string, d Dialer) (net.Conn, error) {
return d.DialContext(ctx, "tcp", address)
}
func newDefaulDialer() net.Dialer { func newDefaulDialer() net.Dialer {
// From `grpc.WithContextDialer` comment: // From `grpc.WithContextDialer` comment:
// //
@ -28,7 +32,7 @@ func newDefaulDialer() net.Dialer {
KeepAlive: time.Duration(-1), KeepAlive: time.Duration(-1),
Control: func(_, _ string, c syscall.RawConn) error { Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) { return c.Control(func(fd uintptr) {
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) _ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
}) })
}, },
} }

View file

@ -58,6 +58,20 @@ func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Co
return nil return nil
} }
// NetContextDialer returns net.DialContext dial function.
// Returns nil if multinet disabled.
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
s.guard.RLock()
defer s.guard.RUnlock()
if s.c.Enabled {
return func(ctx context.Context, network, address string) (net.Conn, error) {
return s.md.DialContext(ctx, network, address)
}
}
return nil
}
func (s *DialerSource) Update(c Config) error { func (s *DialerSource) Update(c Config) error {
s.guard.Lock() s.guard.Lock()
defer s.guard.Unlock() defer s.guard.Unlock()

View file

@ -0,0 +1,29 @@
package net
import (
"net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/multinet"
)
var _ multinet.EventHandler = (*metricsEventHandler)(nil)
type metricsEventHandler struct {
m metrics.MultinetMetrics
}
func (m *metricsEventHandler) DialPerformed(sourceIP net.Addr, _ string, _ string, err error) {
sourceIPString := "undefined"
if sourceIP != nil {
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
}
m.m.Dial(sourceIPString, err == nil)
}
func newEventHandler(m metrics.MultinetMetrics) multinet.EventHandler {
if m == nil {
return nil
}
return &metricsEventHandler{m: m}
}

View file

@ -463,6 +463,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
name: morphPrefix, name: morphPrefix,
from: fromSideChainBlock, from: fromSideChainBlock,
morphCacheMetric: s.irMetrics.MorphCacheMetrics(), morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
multinetMetrics: s.irMetrics.Multinet(),
} }
// create morph client // create morph client

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
@ -116,6 +117,7 @@ type (
sgn *transaction.Signer sgn *transaction.Signer
from uint32 // block height from uint32 // block height
morphCacheMetric metrics.MorphCacheMetrics morphCacheMetric metrics.MorphCacheMetrics
multinetMetrics metrics.MultinetMetrics
} }
) )
@ -486,6 +488,12 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
} }
nc := parseMultinetConfig(p.cfg, p.multinetMetrics)
ds, err := internalNet.NewDialerSource(nc)
if err != nil {
return nil, fmt.Errorf("dialer source: %w", err)
}
return client.New( return client.New(
ctx, ctx,
p.key, p.key,
@ -498,6 +506,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
}), }),
client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")), client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")),
client.WithMorphCacheMetrics(p.morphCacheMetric), client.WithMorphCacheMetrics(p.morphCacheMetric),
client.WithDialerSource(ds),
) )
} }
@ -542,6 +551,28 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
return extraWallets, nil return extraWallets, nil
} }
func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config {
nc := internalNet.Config{
Enabled: cfg.GetBool("multinet.enabled"),
Balancer: cfg.GetString("multinet.balancer"),
Restrict: cfg.GetBool("multinet.restrict"),
FallbackDelay: cfg.GetDuration("multinet.fallback_delay"),
Metrics: m,
}
for i := 0; ; i++ {
mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i))
if mask == "" {
break
}
sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i))
nc.Subnets = append(nc.Subnets, internalNet.Subnet{
Prefix: mask,
SourceIPs: sourceIPs,
})
}
return nc
}
func (s *Server) initConfigFromBlockchain() error { func (s *Server) initConfigFromBlockchain() error {
// get current epoch // get current epoch
epoch, err := s.netmapClient.Epoch() epoch, err := s.netmapClient.Epoch()

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics" morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
@ -41,13 +42,13 @@ type cfg struct {
endpoints []Endpoint endpoints []Endpoint
singleCli *rpcclient.WSClient // neo-go client for single client mode
inactiveModeCb Callback inactiveModeCb Callback
switchInterval time.Duration switchInterval time.Duration
morphCacheMetrics metrics.MorphCacheMetrics morphCacheMetrics metrics.MorphCacheMetrics
dialerSource *internalNet.DialerSource
} }
const ( const (
@ -124,40 +125,24 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
var err error var err error
var act *actor.Actor var act *actor.Actor
if cfg.singleCli != nil { var endpoint Endpoint
// return client in single RPC node mode that uses for cli.endpoints.curr, endpoint = range cli.endpoints.list {
// predefined WS client cli.client, act, err = cli.newCli(ctx, endpoint)
//
// in case of the closing web socket connection:
// if extra endpoints were provided via options,
// they will be used in switch process, otherwise
// inactive mode will be enabled
cli.client = cfg.singleCli
act, err = newActor(cfg.singleCli, acc, *cfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create RPC actor: %w", err) cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
} zap.Error(err), zap.String("endpoint", endpoint.Address))
} else { } else {
var endpoint Endpoint cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
for cli.endpoints.curr, endpoint = range cli.endpoints.list { zap.String("endpoint", endpoint.Address))
cli.client, act, err = cli.newCli(ctx, endpoint) if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
if err != nil { cli.switchIsActive.Store(true)
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint, go cli.switchToMostPrioritized(ctx)
zap.Error(err), zap.String("endpoint", endpoint.Address))
} else {
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
zap.String("endpoint", endpoint.Address))
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
cli.switchIsActive.Store(true)
go cli.switchToMostPrioritized(ctx)
}
break
} }
break
} }
if cli.client == nil { }
return nil, ErrNoHealthyEndpoint if cli.client == nil {
} return nil, ErrNoHealthyEndpoint
} }
cli.setActor(act) cli.setActor(act)
@ -175,6 +160,7 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl
Options: rpcclient.Options{ Options: rpcclient.Options{
DialTimeout: c.cfg.dialTimeout, DialTimeout: c.cfg.dialTimeout,
TLSClientConfig: cfg, TLSClientConfig: cfg,
NetDialContext: c.cfg.dialerSource.NetContextDialer(),
}, },
}) })
if err != nil { if err != nil {
@ -285,17 +271,6 @@ func WithEndpoints(endpoints ...Endpoint) Option {
} }
} }
// WithSingleClient returns a client constructor option
// that specifies single neo-go client and forces Client
// to use it for requests.
//
// Passed client must already be initialized.
func WithSingleClient(cli *rpcclient.WSClient) Option {
return func(c *cfg) {
c.singleCli = cli
}
}
// WithConnLostCallback return a client constructor option // WithConnLostCallback return a client constructor option
// that specifies a callback that is called when Client // that specifies a callback that is called when Client
// unsuccessfully tried to connect to all the specified // unsuccessfully tried to connect to all the specified
@ -320,3 +295,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
c.morphCacheMetrics = morphCacheMetrics c.morphCacheMetrics = morphCacheMetrics
} }
} }
func WithDialerSource(ds *internalNet.DialerSource) Option {
return func(c *cfg) {
c.dialerSource = ds
}
}

View file

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
) )
@ -25,6 +26,7 @@ type (
Key *ecdsa.PrivateKey Key *ecdsa.PrivateKey
ResponseCallback func(client.ResponseMetaInfo) error ResponseCallback func(client.ResponseMetaInfo) error
AllowExternal bool AllowExternal bool
DialerSource *net.DialerSource
} }
) )

View file

@ -60,18 +60,21 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
prmInit.Key = *x.opts.Key prmInit.Key = *x.opts.Key
} }
grpcOpts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
}
prmDial := client.PrmDial{ prmDial := client.PrmDial{
Endpoint: addr.URIAddr(), Endpoint: addr.URIAddr(),
GRPCDialOptions: []grpc.DialOption{ GRPCDialOptions: grpcOpts,
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
},
} }
if x.opts.DialTimeout > 0 { if x.opts.DialTimeout > 0 {
prmDial.DialTimeout = x.opts.DialTimeout prmDial.DialTimeout = x.opts.DialTimeout

View file

@ -0,0 +1,51 @@
package placement
import (
"errors"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const (
attrPrefix = "$attribute:"
)
type Metric interface {
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) []byte
}
func ValidateMetric(raw string) error {
if strings.HasPrefix(raw, attrPrefix) {
return nil
}
return errors.New("unsupported priority metric")
}
func ParseMetric(raw string) Metric {
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
return NewAttributeMetric(attr)
}
return nil
}
// attributeMetric describes priority metric based on attribute.
type attributeMetric struct {
attribute string
}
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
// the value of attribute is the same. In other case return [1].
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) []byte {
fromAttr := from.Attribute(am.attribute)
toAttr := to.Attribute(am.attribute)
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
return []byte{0}
}
return []byte{1}
}
func NewAttributeMetric(raw string) Metric {
attr, _ := strings.CutPrefix(raw, attrPrefix)
return &attributeMetric{attribute: attr}
}

View file

@ -1,10 +1,14 @@
package placement package placement
import ( import (
"bytes"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"slices"
"sync" "sync"
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -23,6 +27,12 @@ type Builder interface {
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
} }
// NodeState encapsulates information about current node state.
type NodeState interface {
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmapAPI.NodeInfo, error)
}
// Option represents placement traverser option. // Option represents placement traverser option.
type Option func(*cfg) type Option func(*cfg)
@ -50,6 +60,10 @@ type cfg struct {
policy netmap.PlacementPolicy policy netmap.PlacementPolicy
builder Builder builder Builder
metrics []Metric
nodeState NodeState
} }
const invalidOptsMsg = "invalid traverser options" const invalidOptsMsg = "invalid traverser options"
@ -99,7 +113,26 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
} }
var rem []int var rem []int
if cfg.flatSuccess != nil { if len(cfg.metrics) > 0 {
rem = defaultCopiesVector(cfg.policy)
var unsortedVector []netmap.NodeInfo
var regularVector []netmap.NodeInfo
for i := range rem {
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
regularVector = append(regularVector, ns[i][rem[i]:]...)
}
rem = make([]int, 2)
rem[0] = -1
rem[1] = -1
sortedVector, err := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = make([][]netmap.NodeInfo, 2)
ns[0] = sortedVector
ns[1] = regularVector
} else if cfg.flatSuccess != nil {
ns = flatNodes(ns) ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)} rem = []int{int(*cfg.flatSuccess)}
} else { } else {
@ -157,6 +190,37 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return [][]netmap.NodeInfo{flat} return [][]netmap.NodeInfo{flat}
} }
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
metrics := make([][]byte, len(unsortedVector))
var node netmap.NodeInfo
nodeV2, err := cfg.nodeState.LocalNodeInfo()
if err != nil {
return nil, err
}
err = node.ReadFromV2(*nodeV2)
if err != nil {
return nil, err
}
b := make([]byte, 2)
for i := range unsortedVector {
for _, m := range cfg.metrics {
metrics[i] = append(metrics[i], m.CalculateValue(&node, &unsortedVector[i])...)
}
binary.LittleEndian.PutUint16(b, uint16(i))
metrics[i] = append(metrics[i], b...)
}
count := len(metrics[0]) - 2
slices.SortFunc(metrics, func(a, b []byte) int {
return bytes.Compare(a[:count], b[:count])
})
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
for i := range unsortedVector {
index := binary.LittleEndian.Uint16(metrics[i][count:])
sortedVector[i] = unsortedVector[index]
}
return sortedVector, nil
}
// Node is a descriptor of storage node with information required for intra-container communication. // Node is a descriptor of storage node with information required for intra-container communication.
type Node struct { type Node struct {
addresses network.AddressGroup addresses network.AddressGroup
@ -322,3 +386,17 @@ func WithCopyNumbers(v []uint32) Option {
c.copyNumbers = v c.copyNumbers = v
} }
} }
// WithPriorityMetrics use provided priority metrics to sort nodes.
func WithPriorityMetrics(m []Metric) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithNodeState provide state of the current node.
func WithNodeState(s NodeState) Option {
return func(c *cfg) {
c.nodeState = s
}
}

View file

@ -4,6 +4,7 @@ import (
"strconv" "strconv"
"testing" "testing"
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
@ -22,7 +23,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
} }
func testNode(v uint32) (n netmap.NodeInfo) { func testNode(v uint32) (n netmap.NodeInfo) {
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))) ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
n.SetNetworkEndpoints(ip)
n.SetPublicKey([]byte(ip))
return n return n
} }
@ -134,7 +137,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(netmapcore.Node(nodes[1][0])) err = n.FromIterator(netmapcore.Node(nodes[1][0]))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []Node{{addresses: n}}, tr.Next()) require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
}) })
t.Run("put scenario", func(t *testing.T) { t.Run("put scenario", func(t *testing.T) {
@ -275,3 +278,197 @@ func TestTraverserRemValues(t *testing.T) {
}) })
} }
} }
type nodeState struct {
node *netmapAPI.NodeInfo
}
func (n *nodeState) LocalNodeInfo() (*netmapAPI.NodeInfo, error) {
return n.node, nil
}
func TestTraverserPriorityMetrics(t *testing.T) {
t.Run("one rep one metric", func(t *testing.T) {
selectors := []int{4}
replicas := []int{3}
nodes, cnr := testPlacement(selectors, replicas)
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("ClusterName", "B")
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodeAPI := &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("$attribute:ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("two reps two metrics", func(t *testing.T) {
selectors := []int{3, 3}
replicas := []int{2, 2}
nodes, cnr := testPlacement(selectors, replicas)
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
nodes[0][2].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
nodes[1][0].SetAttribute("ClusterName", "B")
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
nodes[1][1].SetAttribute("ClusterName", "B")
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
nodes[1][2].SetAttribute("ClusterName", "B")
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
sdkNode := testNode(9)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
nodeAPI := &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy := copyVectors(nodes)
m := []Metric{
NewAttributeMetric("$attribute:ClusterName"),
NewAttributeMetric("$attribute:UN-LOCODE"),
}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next := tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
nodeAPI = &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "A")
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
nodeAPI = &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
}

View file

@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
@ -21,6 +22,7 @@ type clientCache struct {
sync.Mutex sync.Mutex
simplelru.LRU[string, cacheItem] simplelru.LRU[string, cacheItem]
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
ds *internalNet.DialerSource
} }
type cacheItem struct { type cacheItem struct {
@ -36,7 +38,7 @@ const (
var errRecentlyFailed = errors.New("client has recently failed") var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init(pk *ecdsa.PrivateKey) { func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) {
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) { l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
if conn := value.cc; conn != nil { if conn := value.cc; conn != nil {
_ = conn.Close() _ = conn.Close()
@ -44,6 +46,7 @@ func (c *clientCache) init(pk *ecdsa.PrivateKey) {
}) })
c.LRU = *l c.LRU = *l
c.key = pk c.key = pk
c.ds = ds
} }
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
@ -99,6 +102,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
metrics.NewStreamClientInterceptor(), metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(),
), ),
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
} }
if !netAddr.IsTLSEnabled() { if !netAddr.IsTLSEnabled() {

View file

@ -4,6 +4,7 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid" frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
@ -45,6 +46,7 @@ type cfg struct {
morphChainStorage policyengine.MorphRuleChainStorageReader morphChainStorage policyengine.MorphRuleChainStorageReader
metrics MetricsRegister metrics MetricsRegister
ds *net.DialerSource
} }
// Option represents configuration option for a tree service. // Option represents configuration option for a tree service.
@ -161,3 +163,9 @@ func WithNetmapState(state netmap.State) Option {
c.state = state c.state = state
} }
} }
func WithDialerSource(ds *net.DialerSource) Option {
return func(c *cfg) {
c.ds = ds
}
}

View file

@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
s.log = &logger.Logger{Logger: zap.NewNop()} s.log = &logger.Logger{Logger: zap.NewNop()}
} }
s.cache.init(s.key) s.cache.init(s.key, s.ds)
s.closeCh = make(chan struct{}) s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp) s.replicateLocalCh = make(chan applyOp)