forked from TrueCloudLab/frostfs-node
Compare commits
10 commits
74db735265
...
b53afce771
Author | SHA1 | Date | |
---|---|---|---|
b53afce771 | |||
c0a2f20eee | |||
2d064d0bd8 | |||
ef38420623 | |||
f7caef355b | |||
fbdfd503e4 | |||
67798bb50e | |||
5b653aa65f | |||
e314f328c4 | |||
6c96cc2af6 |
33 changed files with 651 additions and 117 deletions
1
Makefile
1
Makefile
|
@ -72,6 +72,7 @@ $(BINS): $(DIRS) dep
|
||||||
@echo "⇒ Build $@"
|
@echo "⇒ Build $@"
|
||||||
CGO_ENABLED=0 \
|
CGO_ENABLED=0 \
|
||||||
go build -v -trimpath \
|
go build -v -trimpath \
|
||||||
|
-gcflags "all=-N -l" \
|
||||||
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
|
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
|
||||||
-o $@ ./cmd/$(notdir $@)
|
-o $@ ./cmd/$(notdir $@)
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,8 @@ func defaultConfiguration(cfg *viper.Viper) {
|
||||||
cfg.SetDefault("node.kludge_compatibility_mode", false)
|
cfg.SetDefault("node.kludge_compatibility_mode", false)
|
||||||
|
|
||||||
cfg.SetDefault("audit.enabled", false)
|
cfg.SetDefault("audit.enabled", false)
|
||||||
|
|
||||||
|
setMultinetDefaults(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setControlDefaults(cfg *viper.Viper) {
|
func setControlDefaults(cfg *viper.Viper) {
|
||||||
|
@ -131,3 +133,11 @@ func setMorphDefaults(cfg *viper.Viper) {
|
||||||
cfg.SetDefault("morph.validators", []string{})
|
cfg.SetDefault("morph.validators", []string{})
|
||||||
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setMultinetDefaults(cfg *viper.Viper) {
|
||||||
|
cfg.SetDefault("multinet.enabled", false)
|
||||||
|
cfg.SetDefault("multinet.balancer", "")
|
||||||
|
cfg.SetDefault("multinet.restrict", false)
|
||||||
|
cfg.SetDefault("multinet.fallback_delay", "0s")
|
||||||
|
cfg.SetDefault("multinet.subnets", "")
|
||||||
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||||
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
@ -109,6 +110,7 @@ type applicationConfiguration struct {
|
||||||
|
|
||||||
ObjectCfg struct {
|
ObjectCfg struct {
|
||||||
tombstoneLifetime uint64
|
tombstoneLifetime uint64
|
||||||
|
priorityMetrics []placement.Metric
|
||||||
}
|
}
|
||||||
|
|
||||||
EngineCfg struct {
|
EngineCfg struct {
|
||||||
|
@ -232,6 +234,11 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
// Object
|
// Object
|
||||||
|
|
||||||
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
||||||
|
var pm []placement.Metric
|
||||||
|
for _, raw := range objectconfig.Get(c).Priority() {
|
||||||
|
pm = append(pm, placement.ParseMetric(raw))
|
||||||
|
}
|
||||||
|
a.ObjectCfg.priorityMetrics = pm
|
||||||
|
|
||||||
// Storage Engine
|
// Storage Engine
|
||||||
|
|
||||||
|
@ -764,7 +771,9 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
||||||
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg))
|
nodeMetrics := metrics.NewNodeMetrics()
|
||||||
|
|
||||||
|
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
cacheOpts := cache.ClientCacheOpts{
|
cacheOpts := cache.ClientCacheOpts{
|
||||||
|
@ -773,6 +782,7 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
||||||
Key: &key.PrivateKey,
|
Key: &key.PrivateKey,
|
||||||
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
||||||
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
||||||
|
DialerSource: ds,
|
||||||
}
|
}
|
||||||
|
|
||||||
return shared{
|
return shared{
|
||||||
|
@ -784,17 +794,18 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
||||||
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
persistate: persistate,
|
persistate: persistate,
|
||||||
metricsCollector: metrics.NewNodeMetrics(),
|
metricsCollector: nodeMetrics,
|
||||||
dialerSource: ds,
|
dialerSource: ds,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func internalNetConfig(appCfg *config.Config) internalNet.Config {
|
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
|
||||||
result := internalNet.Config{
|
result := internalNet.Config{
|
||||||
Enabled: multinet.Enabled(appCfg),
|
Enabled: multinet.Enabled(appCfg),
|
||||||
Balancer: multinet.Balancer(appCfg),
|
Balancer: multinet.Balancer(appCfg),
|
||||||
Restrict: multinet.Restrict(appCfg),
|
Restrict: multinet.Restrict(appCfg),
|
||||||
FallbackDelay: multinet.FallbackDelay(appCfg),
|
FallbackDelay: multinet.FallbackDelay(appCfg),
|
||||||
|
Metrics: m,
|
||||||
}
|
}
|
||||||
sn := multinet.Subnets(appCfg)
|
sn := multinet.Subnets(appCfg)
|
||||||
for _, s := range sn {
|
for _, s := range sn {
|
||||||
|
@ -1361,7 +1372,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.dialerSource.Update(internalNetConfig(c.appCfg)); err != nil {
|
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
|
||||||
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
|
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,10 +10,17 @@ type PutConfig struct {
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetConfig is a wrapper over "get" config section which provides access
|
||||||
|
// to object get pipeline configuration of object service.
|
||||||
|
type GetConfig struct {
|
||||||
|
cfg *config.Config
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subsection = "object"
|
subsection = "object"
|
||||||
|
|
||||||
putSubsection = "put"
|
putSubsection = "put"
|
||||||
|
getSubsection = "get"
|
||||||
|
|
||||||
// PutPoolSizeDefault is a default value of routine pool size to
|
// PutPoolSizeDefault is a default value of routine pool size to
|
||||||
// process object.Put requests in object service.
|
// process object.Put requests in object service.
|
||||||
|
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
|
||||||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns structure that provides access to "get" subsection of
|
||||||
|
// "object" section.
|
||||||
|
func Get(c *config.Config) GetConfig {
|
||||||
|
return GetConfig{
|
||||||
|
c.Sub(subsection).Sub(getSubsection),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Priority returns the value of "priority" config parameter.
|
||||||
|
func (g GetConfig) Priority() []string {
|
||||||
|
return config.StringSliceSafe(g.cfg, "priority")
|
||||||
|
}
|
||||||
|
|
|
@ -29,51 +29,12 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func initMorphComponents(ctx context.Context, c *cfg) {
|
func initMorphComponents(ctx context.Context, c *cfg) {
|
||||||
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
initMorphClient(ctx, c)
|
||||||
|
|
||||||
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
|
||||||
// order of endpoints with the same priority.
|
|
||||||
rand.Shuffle(len(addresses), func(i, j int) {
|
|
||||||
addresses[i], addresses[j] = addresses[j], addresses[i]
|
|
||||||
})
|
|
||||||
|
|
||||||
cli, err := client.New(ctx,
|
|
||||||
c.key,
|
|
||||||
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
|
||||||
client.WithLogger(c.log),
|
|
||||||
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
|
|
||||||
client.WithEndpoints(addresses...),
|
|
||||||
client.WithConnLostCallback(func() {
|
|
||||||
c.internalErr <- errors.New("morph connection has been lost")
|
|
||||||
}),
|
|
||||||
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
|
||||||
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
|
|
||||||
zap.Any("endpoints", addresses),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
fatalOnErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.onShutdown(func() {
|
|
||||||
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
|
|
||||||
cli.Close()
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := cli.SetGroupSignerScope(); err != nil {
|
|
||||||
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
c.cfgMorph.client = cli
|
|
||||||
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
|
||||||
|
|
||||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||||
|
|
||||||
if c.cfgMorph.notaryEnabled {
|
if c.cfgMorph.notaryEnabled {
|
||||||
err = c.cfgMorph.client.EnableNotarySupport(
|
err := c.cfgMorph.client.EnableNotarySupport(
|
||||||
client.WithProxyContract(
|
client.WithProxyContract(
|
||||||
c.cfgMorph.proxyScriptHash,
|
c.cfgMorph.proxyScriptHash,
|
||||||
),
|
),
|
||||||
|
@ -111,6 +72,50 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
||||||
c.cfgNetmap.wrapper = wrap
|
c.cfgNetmap.wrapper = wrap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initMorphClient(ctx context.Context, c *cfg) {
|
||||||
|
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
||||||
|
|
||||||
|
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
||||||
|
// order of endpoints with the same priority.
|
||||||
|
rand.Shuffle(len(addresses), func(i, j int) {
|
||||||
|
addresses[i], addresses[j] = addresses[j], addresses[i]
|
||||||
|
})
|
||||||
|
|
||||||
|
cli, err := client.New(ctx,
|
||||||
|
c.key,
|
||||||
|
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
||||||
|
client.WithLogger(c.log),
|
||||||
|
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
|
||||||
|
client.WithEndpoints(addresses...),
|
||||||
|
client.WithConnLostCallback(func() {
|
||||||
|
c.internalErr <- errors.New("morph connection has been lost")
|
||||||
|
}),
|
||||||
|
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
||||||
|
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
|
||||||
|
client.WithDialerSource(c.dialerSource),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
|
||||||
|
zap.Any("endpoints", addresses),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
fatalOnErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.onShutdown(func() {
|
||||||
|
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
|
||||||
|
cli.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := cli.SetGroupSignerScope(); err != nil {
|
||||||
|
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
c.cfgMorph.client = cli
|
||||||
|
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
||||||
|
}
|
||||||
|
|
||||||
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
||||||
// skip notary deposit in non-notary environments
|
// skip notary deposit in non-notary environments
|
||||||
if !c.cfgMorph.notaryEnabled {
|
if !c.cfgMorph.notaryEnabled {
|
||||||
|
|
|
@ -174,11 +174,13 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sPutV2 := createPutSvcV2(sPut, keyStorage)
|
sPutV2 := createPutSvcV2(sPut, keyStorage)
|
||||||
|
|
||||||
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||||
|
c.ObjectCfg.priorityMetrics)
|
||||||
|
|
||||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||||
|
|
||||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||||
|
c.ObjectCfg.priorityMetrics)
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
|
||||||
|
@ -366,7 +368,10 @@ func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Servic
|
||||||
return patchsvc.NewService(sPut.Config, sGet)
|
return patchsvc.NewService(sPut.Config, sGet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, containerSource containercore.Source) *searchsvc.Service {
|
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache,
|
||||||
|
containerSource containercore.Source,
|
||||||
|
priorityMetrics []placement.Metric,
|
||||||
|
) *searchsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
return searchsvc.New(
|
return searchsvc.New(
|
||||||
|
@ -374,6 +379,8 @@ func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Trav
|
||||||
coreConstructor,
|
coreConstructor,
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.WithoutSuccessTracking(),
|
placement.WithoutSuccessTracking(),
|
||||||
|
placement.WithPriorityMetrics(priorityMetrics),
|
||||||
|
placement.WithNodeState(c),
|
||||||
),
|
),
|
||||||
c.netMapSource,
|
c.netMapSource,
|
||||||
keyStorage,
|
keyStorage,
|
||||||
|
@ -389,6 +396,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
||||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||||
coreConstructor *cache.ClientCache,
|
coreConstructor *cache.ClientCache,
|
||||||
containerSource containercore.Source,
|
containerSource containercore.Source,
|
||||||
|
priorityMetrics []placement.Metric,
|
||||||
) *getsvc.Service {
|
) *getsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
@ -398,6 +406,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
||||||
ls,
|
ls,
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
|
placement.WithPriorityMetrics(priorityMetrics),
|
||||||
|
placement.WithNodeState(c),
|
||||||
),
|
),
|
||||||
coreConstructor,
|
coreConstructor,
|
||||||
containerSource,
|
containerSource,
|
||||||
|
|
|
@ -67,6 +67,7 @@ func initTreeService(c *cfg) {
|
||||||
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
||||||
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
|
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
|
||||||
tree.WithNetmapState(c.cfgNetmap.state),
|
tree.WithNetmapState(c.cfgNetmap.state),
|
||||||
|
tree.WithDialerSource(c.dialerSource),
|
||||||
)
|
)
|
||||||
|
|
||||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||||
|
|
|
@ -8,9 +8,11 @@ import (
|
||||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||||
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,6 +32,13 @@ func validateConfig(c *config.Config) error {
|
||||||
return fmt.Errorf("invalid logger destination: %w", err)
|
return fmt.Errorf("invalid logger destination: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validate priority metrics for GET and SEARCH requests
|
||||||
|
for _, raw := range objectconfig.Get(c).Priority() {
|
||||||
|
if err := placement.ValidateMetric(raw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// shard configuration validation
|
// shard configuration validation
|
||||||
|
|
||||||
shardNum := 0
|
shardNum := 0
|
||||||
|
|
|
@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
|
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
|
|
|
@ -131,6 +131,9 @@
|
||||||
"remote_pool_size": 100,
|
"remote_pool_size": 100,
|
||||||
"local_pool_size": 200,
|
"local_pool_size": 200,
|
||||||
"skip_session_token_issuer_verification": true
|
"skip_session_token_issuer_verification": true
|
||||||
|
},
|
||||||
|
"get": {
|
||||||
|
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
|
|
|
@ -114,6 +114,10 @@ object:
|
||||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||||
local_pool_size: 200 # number of async workers for local PUT operations
|
local_pool_size: 200 # number of async workers for local PUT operations
|
||||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||||
|
get:
|
||||||
|
priority: # list of metrics of nodes for prioritization
|
||||||
|
- $attribute:ClusterName
|
||||||
|
- $attribute:UN-LOCODE
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
|
|
|
@ -407,13 +407,17 @@ Contains object-service related parameters.
|
||||||
object:
|
object:
|
||||||
put:
|
put:
|
||||||
remote_pool_size: 100
|
remote_pool_size: 100
|
||||||
|
get:
|
||||||
|
priority:
|
||||||
|
- $attribute:ClusterName
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||||
|
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||||
|
|
||||||
# `runtime` section
|
# `runtime` section
|
||||||
Contains runtime parameters.
|
Contains runtime parameters.
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -133,4 +133,4 @@ require (
|
||||||
rsc.io/tmplfunc v0.0.3 // indirect
|
rsc.io/tmplfunc v0.0.3 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928
|
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -16,8 +16,8 @@ git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8l
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
|
||||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM=
|
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY=
|
||||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
|
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ=
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ=
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||||
|
|
|
@ -22,6 +22,7 @@ const (
|
||||||
grpcServerSubsystem = "grpc_server"
|
grpcServerSubsystem = "grpc_server"
|
||||||
policerSubsystem = "policer"
|
policerSubsystem = "policer"
|
||||||
commonCacheSubsystem = "common_cache"
|
commonCacheSubsystem = "common_cache"
|
||||||
|
multinetSubsystem = "multinet"
|
||||||
|
|
||||||
successLabel = "success"
|
successLabel = "success"
|
||||||
shardIDLabel = "shard_id"
|
shardIDLabel = "shard_id"
|
||||||
|
@ -41,6 +42,7 @@ const (
|
||||||
endpointLabel = "endpoint"
|
endpointLabel = "endpoint"
|
||||||
hitLabel = "hit"
|
hitLabel = "hit"
|
||||||
cacheLabel = "cache"
|
cacheLabel = "cache"
|
||||||
|
sourceIPLabel = "source_ip"
|
||||||
|
|
||||||
readWriteMode = "READ_WRITE"
|
readWriteMode = "READ_WRITE"
|
||||||
readOnlyMode = "READ_ONLY"
|
readOnlyMode = "READ_ONLY"
|
||||||
|
|
|
@ -17,6 +17,7 @@ type InnerRingServiceMetrics struct {
|
||||||
eventDuration *prometheus.HistogramVec
|
eventDuration *prometheus.HistogramVec
|
||||||
morphCacheMetrics *morphCacheMetrics
|
morphCacheMetrics *morphCacheMetrics
|
||||||
logMetrics logger.LogMetrics
|
logMetrics logger.LogMetrics
|
||||||
|
multinet *multinetMetrics
|
||||||
// nolint: unused
|
// nolint: unused
|
||||||
appInfo *ApplicationInfo
|
appInfo *ApplicationInfo
|
||||||
}
|
}
|
||||||
|
@ -51,6 +52,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
|
||||||
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
|
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
|
||||||
appInfo: NewApplicationInfo(misc.Version),
|
appInfo: NewApplicationInfo(misc.Version),
|
||||||
logMetrics: logger.NewLogMetrics(innerRingNamespace),
|
logMetrics: logger.NewLogMetrics(innerRingNamespace),
|
||||||
|
multinet: newMultinetMetrics(innerRingNamespace),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,3 +80,7 @@ func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
||||||
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
|
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
|
||||||
return m.logMetrics
|
return m.logMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *InnerRingServiceMetrics) Multinet() MultinetMetrics {
|
||||||
|
return m.multinet
|
||||||
|
}
|
||||||
|
|
35
internal/metrics/multinet.go
Normal file
35
internal/metrics/multinet.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type multinetMetrics struct {
|
||||||
|
dials *prometheus.GaugeVec
|
||||||
|
}
|
||||||
|
|
||||||
|
type MultinetMetrics interface {
|
||||||
|
Dial(sourceIP string, success bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMultinetMetrics(ns string) *multinetMetrics {
|
||||||
|
return &multinetMetrics{
|
||||||
|
dials: metrics.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Subsystem: multinetSubsystem,
|
||||||
|
Name: "dial_count",
|
||||||
|
Help: "Dials count performed by multinet",
|
||||||
|
}, []string{sourceIPLabel, successLabel}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multinetMetrics) Dial(sourceIP string, success bool) {
|
||||||
|
m.dials.With(prometheus.Labels{
|
||||||
|
sourceIPLabel: sourceIP,
|
||||||
|
successLabel: strconv.FormatBool(success),
|
||||||
|
}).Inc()
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ type NodeMetrics struct {
|
||||||
morphClient *morphClientMetrics
|
morphClient *morphClientMetrics
|
||||||
morphCache *morphCacheMetrics
|
morphCache *morphCacheMetrics
|
||||||
log logger.LogMetrics
|
log logger.LogMetrics
|
||||||
|
multinet *multinetMetrics
|
||||||
// nolint: unused
|
// nolint: unused
|
||||||
appInfo *ApplicationInfo
|
appInfo *ApplicationInfo
|
||||||
}
|
}
|
||||||
|
@ -53,6 +54,7 @@ func NewNodeMetrics() *NodeMetrics {
|
||||||
morphCache: newMorphCacheMetrics(namespace),
|
morphCache: newMorphCacheMetrics(namespace),
|
||||||
log: logger.NewLogMetrics(namespace),
|
log: logger.NewLogMetrics(namespace),
|
||||||
appInfo: NewApplicationInfo(misc.Version),
|
appInfo: NewApplicationInfo(misc.Version),
|
||||||
|
multinet: newMultinetMetrics(namespace),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,3 +122,7 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
||||||
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
||||||
return m.log
|
return m.log
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
|
||||||
|
return m.multinet
|
||||||
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"slices"
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/multinet"
|
"git.frostfs.info/TrueCloudLab/multinet"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,6 +24,7 @@ type Config struct {
|
||||||
Balancer string
|
Balancer string
|
||||||
Restrict bool
|
Restrict bool
|
||||||
FallbackDelay time.Duration
|
FallbackDelay time.Duration
|
||||||
|
Metrics metrics.MultinetMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||||
|
@ -52,6 +54,7 @@ func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||||
Restrict: c.Restrict,
|
Restrict: c.Restrict,
|
||||||
FallbackDelay: c.FallbackDelay,
|
FallbackDelay: c.FallbackDelay,
|
||||||
Dialer: newDefaulDialer(),
|
Dialer: newDefaulDialer(),
|
||||||
|
EventHandler: newEventHandler(c.Metrics),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,10 @@ type Dialer interface {
|
||||||
DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DialContextTCP(ctx context.Context, address string, d Dialer) (net.Conn, error) {
|
||||||
|
return d.DialContext(ctx, "tcp", address)
|
||||||
|
}
|
||||||
|
|
||||||
func newDefaulDialer() net.Dialer {
|
func newDefaulDialer() net.Dialer {
|
||||||
// From `grpc.WithContextDialer` comment:
|
// From `grpc.WithContextDialer` comment:
|
||||||
//
|
//
|
||||||
|
@ -28,7 +32,7 @@ func newDefaulDialer() net.Dialer {
|
||||||
KeepAlive: time.Duration(-1),
|
KeepAlive: time.Duration(-1),
|
||||||
Control: func(_, _ string, c syscall.RawConn) error {
|
Control: func(_, _ string, c syscall.RawConn) error {
|
||||||
return c.Control(func(fd uintptr) {
|
return c.Control(func(fd uintptr) {
|
||||||
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,20 @@ func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Co
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetContextDialer returns net.DialContext dial function.
|
||||||
|
// Returns nil if multinet disabled.
|
||||||
|
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||||
|
s.guard.RLock()
|
||||||
|
defer s.guard.RUnlock()
|
||||||
|
|
||||||
|
if s.c.Enabled {
|
||||||
|
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
|
return s.md.DialContext(ctx, network, address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *DialerSource) Update(c Config) error {
|
func (s *DialerSource) Update(c Config) error {
|
||||||
s.guard.Lock()
|
s.guard.Lock()
|
||||||
defer s.guard.Unlock()
|
defer s.guard.Unlock()
|
||||||
|
|
29
internal/net/event_handler.go
Normal file
29
internal/net/event_handler.go
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package net
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
"git.frostfs.info/TrueCloudLab/multinet"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ multinet.EventHandler = (*metricsEventHandler)(nil)
|
||||||
|
|
||||||
|
type metricsEventHandler struct {
|
||||||
|
m metrics.MultinetMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsEventHandler) DialPerformed(sourceIP net.Addr, _ string, _ string, err error) {
|
||||||
|
sourceIPString := "undefined"
|
||||||
|
if sourceIP != nil {
|
||||||
|
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
|
||||||
|
}
|
||||||
|
m.m.Dial(sourceIPString, err == nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventHandler(m metrics.MultinetMetrics) multinet.EventHandler {
|
||||||
|
if m == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &metricsEventHandler{m: m}
|
||||||
|
}
|
|
@ -463,6 +463,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
||||||
name: morphPrefix,
|
name: morphPrefix,
|
||||||
from: fromSideChainBlock,
|
from: fromSideChainBlock,
|
||||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||||
|
multinetMetrics: s.irMetrics.Multinet(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// create morph client
|
// create morph client
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||||
|
@ -116,6 +117,7 @@ type (
|
||||||
sgn *transaction.Signer
|
sgn *transaction.Signer
|
||||||
from uint32 // block height
|
from uint32 // block height
|
||||||
morphCacheMetric metrics.MorphCacheMetrics
|
morphCacheMetric metrics.MorphCacheMetrics
|
||||||
|
multinetMetrics metrics.MultinetMetrics
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -486,6 +488,12 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
|
||||||
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
|
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nc := parseMultinetConfig(p.cfg, p.multinetMetrics)
|
||||||
|
ds, err := internalNet.NewDialerSource(nc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("dialer source: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return client.New(
|
return client.New(
|
||||||
ctx,
|
ctx,
|
||||||
p.key,
|
p.key,
|
||||||
|
@ -498,6 +506,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
|
||||||
}),
|
}),
|
||||||
client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")),
|
client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")),
|
||||||
client.WithMorphCacheMetrics(p.morphCacheMetric),
|
client.WithMorphCacheMetrics(p.morphCacheMetric),
|
||||||
|
client.WithDialerSource(ds),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -542,6 +551,28 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
|
||||||
return extraWallets, nil
|
return extraWallets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config {
|
||||||
|
nc := internalNet.Config{
|
||||||
|
Enabled: cfg.GetBool("multinet.enabled"),
|
||||||
|
Balancer: cfg.GetString("multinet.balancer"),
|
||||||
|
Restrict: cfg.GetBool("multinet.restrict"),
|
||||||
|
FallbackDelay: cfg.GetDuration("multinet.fallback_delay"),
|
||||||
|
Metrics: m,
|
||||||
|
}
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i))
|
||||||
|
if mask == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i))
|
||||||
|
nc.Subnets = append(nc.Subnets, internalNet.Subnet{
|
||||||
|
Prefix: mask,
|
||||||
|
SourceIPs: sourceIPs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nc
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) initConfigFromBlockchain() error {
|
func (s *Server) initConfigFromBlockchain() error {
|
||||||
// get current epoch
|
// get current epoch
|
||||||
epoch, err := s.netmapClient.Epoch()
|
epoch, err := s.netmapClient.Epoch()
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
@ -41,13 +42,13 @@ type cfg struct {
|
||||||
|
|
||||||
endpoints []Endpoint
|
endpoints []Endpoint
|
||||||
|
|
||||||
singleCli *rpcclient.WSClient // neo-go client for single client mode
|
|
||||||
|
|
||||||
inactiveModeCb Callback
|
inactiveModeCb Callback
|
||||||
|
|
||||||
switchInterval time.Duration
|
switchInterval time.Duration
|
||||||
|
|
||||||
morphCacheMetrics metrics.MorphCacheMetrics
|
morphCacheMetrics metrics.MorphCacheMetrics
|
||||||
|
|
||||||
|
dialerSource *internalNet.DialerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -124,40 +125,24 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var act *actor.Actor
|
var act *actor.Actor
|
||||||
if cfg.singleCli != nil {
|
var endpoint Endpoint
|
||||||
// return client in single RPC node mode that uses
|
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
|
||||||
// predefined WS client
|
cli.client, act, err = cli.newCli(ctx, endpoint)
|
||||||
//
|
|
||||||
// in case of the closing web socket connection:
|
|
||||||
// if extra endpoints were provided via options,
|
|
||||||
// they will be used in switch process, otherwise
|
|
||||||
// inactive mode will be enabled
|
|
||||||
cli.client = cfg.singleCli
|
|
||||||
|
|
||||||
act, err = newActor(cfg.singleCli, acc, *cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create RPC actor: %w", err)
|
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
|
||||||
}
|
zap.Error(err), zap.String("endpoint", endpoint.Address))
|
||||||
} else {
|
} else {
|
||||||
var endpoint Endpoint
|
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
|
||||||
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
|
zap.String("endpoint", endpoint.Address))
|
||||||
cli.client, act, err = cli.newCli(ctx, endpoint)
|
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
|
||||||
if err != nil {
|
cli.switchIsActive.Store(true)
|
||||||
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
|
go cli.switchToMostPrioritized(ctx)
|
||||||
zap.Error(err), zap.String("endpoint", endpoint.Address))
|
|
||||||
} else {
|
|
||||||
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
|
|
||||||
zap.String("endpoint", endpoint.Address))
|
|
||||||
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
|
|
||||||
cli.switchIsActive.Store(true)
|
|
||||||
go cli.switchToMostPrioritized(ctx)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if cli.client == nil {
|
}
|
||||||
return nil, ErrNoHealthyEndpoint
|
if cli.client == nil {
|
||||||
}
|
return nil, ErrNoHealthyEndpoint
|
||||||
}
|
}
|
||||||
cli.setActor(act)
|
cli.setActor(act)
|
||||||
|
|
||||||
|
@ -175,6 +160,7 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl
|
||||||
Options: rpcclient.Options{
|
Options: rpcclient.Options{
|
||||||
DialTimeout: c.cfg.dialTimeout,
|
DialTimeout: c.cfg.dialTimeout,
|
||||||
TLSClientConfig: cfg,
|
TLSClientConfig: cfg,
|
||||||
|
NetDialContext: c.cfg.dialerSource.NetContextDialer(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -285,17 +271,6 @@ func WithEndpoints(endpoints ...Endpoint) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithSingleClient returns a client constructor option
|
|
||||||
// that specifies single neo-go client and forces Client
|
|
||||||
// to use it for requests.
|
|
||||||
//
|
|
||||||
// Passed client must already be initialized.
|
|
||||||
func WithSingleClient(cli *rpcclient.WSClient) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.singleCli = cli
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithConnLostCallback return a client constructor option
|
// WithConnLostCallback return a client constructor option
|
||||||
// that specifies a callback that is called when Client
|
// that specifies a callback that is called when Client
|
||||||
// unsuccessfully tried to connect to all the specified
|
// unsuccessfully tried to connect to all the specified
|
||||||
|
@ -320,3 +295,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
|
||||||
c.morphCacheMetrics = morphCacheMetrics
|
c.morphCacheMetrics = morphCacheMetrics
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithDialerSource(ds *internalNet.DialerSource) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.dialerSource = ds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
2
pkg/network/cache/client.go
vendored
2
pkg/network/cache/client.go
vendored
|
@ -5,6 +5,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
)
|
)
|
||||||
|
@ -25,6 +26,7 @@ type (
|
||||||
Key *ecdsa.PrivateKey
|
Key *ecdsa.PrivateKey
|
||||||
ResponseCallback func(client.ResponseMetaInfo) error
|
ResponseCallback func(client.ResponseMetaInfo) error
|
||||||
AllowExternal bool
|
AllowExternal bool
|
||||||
|
DialerSource *net.DialerSource
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
25
pkg/network/cache/multi.go
vendored
25
pkg/network/cache/multi.go
vendored
|
@ -60,18 +60,21 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
||||||
prmInit.Key = *x.opts.Key
|
prmInit.Key = *x.opts.Key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
grpcOpts := []grpc.DialOption{
|
||||||
|
grpc.WithChainUnaryInterceptor(
|
||||||
|
metrics.NewUnaryClientInterceptor(),
|
||||||
|
tracing.NewUnaryClientInteceptor(),
|
||||||
|
),
|
||||||
|
grpc.WithChainStreamInterceptor(
|
||||||
|
metrics.NewStreamClientInterceptor(),
|
||||||
|
tracing.NewStreamClientInterceptor(),
|
||||||
|
),
|
||||||
|
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||||
|
}
|
||||||
|
|
||||||
prmDial := client.PrmDial{
|
prmDial := client.PrmDial{
|
||||||
Endpoint: addr.URIAddr(),
|
Endpoint: addr.URIAddr(),
|
||||||
GRPCDialOptions: []grpc.DialOption{
|
GRPCDialOptions: grpcOpts,
|
||||||
grpc.WithChainUnaryInterceptor(
|
|
||||||
metrics.NewUnaryClientInterceptor(),
|
|
||||||
tracing.NewUnaryClientInteceptor(),
|
|
||||||
),
|
|
||||||
grpc.WithChainStreamInterceptor(
|
|
||||||
metrics.NewStreamClientInterceptor(),
|
|
||||||
tracing.NewStreamClientInterceptor(),
|
|
||||||
),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
if x.opts.DialTimeout > 0 {
|
if x.opts.DialTimeout > 0 {
|
||||||
prmDial.DialTimeout = x.opts.DialTimeout
|
prmDial.DialTimeout = x.opts.DialTimeout
|
||||||
|
|
51
pkg/services/object_manager/placement/metrics.go
Normal file
51
pkg/services/object_manager/placement/metrics.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package placement
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
attrPrefix = "$attribute:"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Metric interface {
|
||||||
|
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidateMetric(raw string) error {
|
||||||
|
if strings.HasPrefix(raw, attrPrefix) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("unsupported priority metric")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseMetric(raw string) Metric {
|
||||||
|
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
|
||||||
|
return NewAttributeMetric(attr)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// attributeMetric describes priority metric based on attribute.
|
||||||
|
type attributeMetric struct {
|
||||||
|
attribute string
|
||||||
|
}
|
||||||
|
|
||||||
|
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
|
||||||
|
// the value of attribute is the same. In other case return [1].
|
||||||
|
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) []byte {
|
||||||
|
fromAttr := from.Attribute(am.attribute)
|
||||||
|
toAttr := to.Attribute(am.attribute)
|
||||||
|
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
|
||||||
|
return []byte{0}
|
||||||
|
}
|
||||||
|
return []byte{1}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAttributeMetric(raw string) Metric {
|
||||||
|
attr, _ := strings.CutPrefix(raw, attrPrefix)
|
||||||
|
return &attributeMetric{attribute: attr}
|
||||||
|
}
|
|
@ -1,10 +1,14 @@
|
||||||
package placement
|
package placement
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -23,6 +27,12 @@ type Builder interface {
|
||||||
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeState encapsulates information about current node state.
|
||||||
|
type NodeState interface {
|
||||||
|
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
|
||||||
|
LocalNodeInfo() (*netmapAPI.NodeInfo, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Option represents placement traverser option.
|
// Option represents placement traverser option.
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
@ -50,6 +60,10 @@ type cfg struct {
|
||||||
policy netmap.PlacementPolicy
|
policy netmap.PlacementPolicy
|
||||||
|
|
||||||
builder Builder
|
builder Builder
|
||||||
|
|
||||||
|
metrics []Metric
|
||||||
|
|
||||||
|
nodeState NodeState
|
||||||
}
|
}
|
||||||
|
|
||||||
const invalidOptsMsg = "invalid traverser options"
|
const invalidOptsMsg = "invalid traverser options"
|
||||||
|
@ -99,7 +113,26 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var rem []int
|
var rem []int
|
||||||
if cfg.flatSuccess != nil {
|
if len(cfg.metrics) > 0 {
|
||||||
|
rem = defaultCopiesVector(cfg.policy)
|
||||||
|
var unsortedVector []netmap.NodeInfo
|
||||||
|
var regularVector []netmap.NodeInfo
|
||||||
|
for i := range rem {
|
||||||
|
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
|
||||||
|
regularVector = append(regularVector, ns[i][rem[i]:]...)
|
||||||
|
}
|
||||||
|
rem = make([]int, 2)
|
||||||
|
rem[0] = -1
|
||||||
|
rem[1] = -1
|
||||||
|
|
||||||
|
sortedVector, err := sortVector(cfg, unsortedVector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ns = make([][]netmap.NodeInfo, 2)
|
||||||
|
ns[0] = sortedVector
|
||||||
|
ns[1] = regularVector
|
||||||
|
} else if cfg.flatSuccess != nil {
|
||||||
ns = flatNodes(ns)
|
ns = flatNodes(ns)
|
||||||
rem = []int{int(*cfg.flatSuccess)}
|
rem = []int{int(*cfg.flatSuccess)}
|
||||||
} else {
|
} else {
|
||||||
|
@ -157,6 +190,37 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
||||||
return [][]netmap.NodeInfo{flat}
|
return [][]netmap.NodeInfo{flat}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
||||||
|
metrics := make([][]byte, len(unsortedVector))
|
||||||
|
var node netmap.NodeInfo
|
||||||
|
nodeV2, err := cfg.nodeState.LocalNodeInfo()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = node.ReadFromV2(*nodeV2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
b := make([]byte, 2)
|
||||||
|
for i := range unsortedVector {
|
||||||
|
for _, m := range cfg.metrics {
|
||||||
|
metrics[i] = append(metrics[i], m.CalculateValue(&node, &unsortedVector[i])...)
|
||||||
|
}
|
||||||
|
binary.LittleEndian.PutUint16(b, uint16(i))
|
||||||
|
metrics[i] = append(metrics[i], b...)
|
||||||
|
}
|
||||||
|
count := len(metrics[0]) - 2
|
||||||
|
slices.SortFunc(metrics, func(a, b []byte) int {
|
||||||
|
return bytes.Compare(a[:count], b[:count])
|
||||||
|
})
|
||||||
|
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
|
||||||
|
for i := range unsortedVector {
|
||||||
|
index := binary.LittleEndian.Uint16(metrics[i][count:])
|
||||||
|
sortedVector[i] = unsortedVector[index]
|
||||||
|
}
|
||||||
|
return sortedVector, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
addresses network.AddressGroup
|
addresses network.AddressGroup
|
||||||
|
@ -322,3 +386,17 @@ func WithCopyNumbers(v []uint32) Option {
|
||||||
c.copyNumbers = v
|
c.copyNumbers = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithPriorityMetrics use provided priority metrics to sort nodes.
|
||||||
|
func WithPriorityMetrics(m []Metric) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.metrics = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNodeState provide state of the current node.
|
||||||
|
func WithNodeState(s NodeState) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.nodeState = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
||||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
@ -22,7 +23,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNode(v uint32) (n netmap.NodeInfo) {
|
func testNode(v uint32) (n netmap.NodeInfo) {
|
||||||
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
|
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
|
||||||
|
n.SetNetworkEndpoints(ip)
|
||||||
|
n.SetPublicKey([]byte(ip))
|
||||||
|
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
@ -134,7 +137,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, []Node{{addresses: n}}, tr.Next())
|
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put scenario", func(t *testing.T) {
|
t.Run("put scenario", func(t *testing.T) {
|
||||||
|
@ -275,3 +278,197 @@ func TestTraverserRemValues(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeState struct {
|
||||||
|
node *netmapAPI.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nodeState) LocalNodeInfo() (*netmapAPI.NodeInfo, error) {
|
||||||
|
return n.node, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTraverserPriorityMetrics(t *testing.T) {
|
||||||
|
t.Run("one rep one metric", func(t *testing.T) {
|
||||||
|
selectors := []int{4}
|
||||||
|
replicas := []int{3}
|
||||||
|
|
||||||
|
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
sdkNode := testNode(5)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
nodeAPI := &netmapAPI.NodeInfo{}
|
||||||
|
sdkNode.WriteToV2(nodeAPI)
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{NewAttributeMetric("$attribute:ClusterName")}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: nodeAPI,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next := tr.Next()
|
||||||
|
require.NotNil(t, next)
|
||||||
|
require.Equal(t, 3, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 1, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("two reps two metrics", func(t *testing.T) {
|
||||||
|
selectors := []int{3, 3}
|
||||||
|
replicas := []int{2, 2}
|
||||||
|
|
||||||
|
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
|
||||||
|
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
nodes[1][0].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
nodes[1][1].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
|
||||||
|
nodes[1][2].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
sdkNode := testNode(9)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
nodeAPI := &netmapAPI.NodeInfo{}
|
||||||
|
sdkNode.WriteToV2(nodeAPI)
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{
|
||||||
|
NewAttributeMetric("$attribute:ClusterName"),
|
||||||
|
NewAttributeMetric("$attribute:UN-LOCODE"),
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: nodeAPI,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next := tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
nodeAPI = &netmapAPI.NodeInfo{}
|
||||||
|
sdkNode.WriteToV2(nodeAPI)
|
||||||
|
|
||||||
|
nodesCopy = copyVectors(nodes)
|
||||||
|
|
||||||
|
tr, err = NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: nodeAPI,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
|
||||||
|
sdkNode.SetAttribute("ClusterName", "A")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
nodeAPI = &netmapAPI.NodeInfo{}
|
||||||
|
sdkNode.WriteToV2(nodeAPI)
|
||||||
|
|
||||||
|
nodesCopy = copyVectors(nodes)
|
||||||
|
|
||||||
|
tr, err = NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: nodeAPI,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
@ -21,6 +22,7 @@ type clientCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
simplelru.LRU[string, cacheItem]
|
simplelru.LRU[string, cacheItem]
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
|
ds *internalNet.DialerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
type cacheItem struct {
|
type cacheItem struct {
|
||||||
|
@ -36,7 +38,7 @@ const (
|
||||||
|
|
||||||
var errRecentlyFailed = errors.New("client has recently failed")
|
var errRecentlyFailed = errors.New("client has recently failed")
|
||||||
|
|
||||||
func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) {
|
||||||
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
|
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||||
if conn := value.cc; conn != nil {
|
if conn := value.cc; conn != nil {
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
|
@ -44,6 +46,7 @@ func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
||||||
})
|
})
|
||||||
c.LRU = *l
|
c.LRU = *l
|
||||||
c.key = pk
|
c.key = pk
|
||||||
|
c.ds = ds
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
||||||
|
@ -99,6 +102,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
|
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||||
}
|
}
|
||||||
|
|
||||||
if !netAddr.IsTLSEnabled() {
|
if !netAddr.IsTLSEnabled() {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
@ -45,6 +46,7 @@ type cfg struct {
|
||||||
morphChainStorage policyengine.MorphRuleChainStorageReader
|
morphChainStorage policyengine.MorphRuleChainStorageReader
|
||||||
|
|
||||||
metrics MetricsRegister
|
metrics MetricsRegister
|
||||||
|
ds *net.DialerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents configuration option for a tree service.
|
// Option represents configuration option for a tree service.
|
||||||
|
@ -161,3 +163,9 @@ func WithNetmapState(state netmap.State) Option {
|
||||||
c.state = state
|
c.state = state
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithDialerSource(ds *net.DialerSource) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.ds = ds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
|
||||||
s.log = &logger.Logger{Logger: zap.NewNop()}
|
s.log = &logger.Logger{Logger: zap.NewNop()}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cache.init(s.key)
|
s.cache.init(s.key, s.ds)
|
||||||
s.closeCh = make(chan struct{})
|
s.closeCh = make(chan struct{})
|
||||||
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
||||||
s.replicateLocalCh = make(chan applyOp)
|
s.replicateLocalCh = make(chan applyOp)
|
||||||
|
|
Loading…
Add table
Reference in a new issue