forked from TrueCloudLab/frostfs-node
Compare commits
8 commits
d2f36e7726
...
744c3947a7
Author | SHA1 | Date | |
---|---|---|---|
744c3947a7 | |||
a238ed879c | |||
29708b78d7 | |||
b9284604d9 | |||
65a4320c75 | |||
9a260c2e64 | |||
6f798b9c4b | |||
e515dd4582 |
28 changed files with 488 additions and 78 deletions
|
@ -58,6 +58,7 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
|
|||
GRPCDialOptions: []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
||||
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
},
|
||||
}
|
||||
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
||||
|
|
|
@ -30,8 +30,6 @@ func initAddCmd() {
|
|||
ff := addCmd.Flags()
|
||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func add(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -36,7 +36,6 @@ func initAddByPathCmd() {
|
|||
ff.String(pathFlagKey, "", "Path to a node")
|
||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package tree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
|
@ -20,7 +21,13 @@ import (
|
|||
// after making Tree API public.
|
||||
func _client() (tree.TreeServiceClient, error) {
|
||||
var netAddr network.Address
|
||||
err := netAddr.FromString(viper.GetString(commonflags.RPC))
|
||||
|
||||
rpcEndpoint := viper.GetString(commonflags.RPC)
|
||||
if rpcEndpoint == "" {
|
||||
return nil, fmt.Errorf("%s is not defined", commonflags.RPC)
|
||||
}
|
||||
|
||||
err := netAddr.FromString(rpcEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -34,6 +41,7 @@ func _client() (tree.TreeServiceClient, error) {
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
||||
|
|
|
@ -36,8 +36,6 @@ func initGetByPathCmd() {
|
|||
ff.String(pathFlagKey, "", "Path to a node")
|
||||
|
||||
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func getByPath(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -30,8 +30,6 @@ func initGetOpLogCmd() {
|
|||
ff := getOpLogCmd.Flags()
|
||||
ff.Uint64(heightFlagKey, 0, "Height to start with")
|
||||
ff.Uint64(countFlagKey, 10, "Logged operations count")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func getOpLog(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -20,8 +20,6 @@ var healthcheckCmd = &cobra.Command{
|
|||
|
||||
func initHealthcheckCmd() {
|
||||
commonflags.Init(healthcheckCmd)
|
||||
ff := healthcheckCmd.Flags()
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func healthcheck(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -26,8 +26,6 @@ func initListCmd() {
|
|||
ff := listCmd.Flags()
|
||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func list(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -33,8 +33,6 @@ func initMoveCmd() {
|
|||
|
||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func move(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -29,8 +29,6 @@ func initRemoveCmd() {
|
|||
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
|
||||
|
||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func remove(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -34,8 +34,6 @@ func initGetSubtreeCmd() {
|
|||
|
||||
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func getSubTree(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -13,9 +13,7 @@ import (
|
|||
)
|
||||
|
||||
func initAccountingService(ctx context.Context, c *cfg) {
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(ctx, c)
|
||||
}
|
||||
c.initMorphComponents(ctx)
|
||||
|
||||
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
|
||||
fatalOnErr(err)
|
||||
|
|
|
@ -58,6 +58,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
|
@ -109,6 +110,7 @@ type applicationConfiguration struct {
|
|||
|
||||
ObjectCfg struct {
|
||||
tombstoneLifetime uint64
|
||||
priorityMetrics []placement.Metric
|
||||
}
|
||||
|
||||
EngineCfg struct {
|
||||
|
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
// Object
|
||||
|
||||
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
||||
var pm []placement.Metric
|
||||
for _, raw := range objectconfig.Get(c).Priority() {
|
||||
m, err := placement.ParseMetric(raw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pm = append(pm, m)
|
||||
}
|
||||
a.ObjectCfg.priorityMetrics = pm
|
||||
|
||||
// Storage Engine
|
||||
|
||||
|
@ -575,6 +586,9 @@ func (c *cfgGRPC) dropConnection(endpoint string) {
|
|||
}
|
||||
|
||||
type cfgMorph struct {
|
||||
initialized bool
|
||||
guard sync.Mutex
|
||||
|
||||
client *client.Client
|
||||
|
||||
notaryEnabled bool
|
||||
|
@ -1176,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|||
return pool
|
||||
}
|
||||
|
||||
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
|
||||
var res netmapV2.NodeInfo
|
||||
|
||||
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
|
||||
var res netmap.NodeInfo
|
||||
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
||||
if ok {
|
||||
ni.WriteToV2(&res)
|
||||
res = ni
|
||||
} else {
|
||||
c.cfgNodeInfo.localInfo.WriteToV2(&res)
|
||||
res = c.cfgNodeInfo.localInfo
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
return &res
|
||||
}
|
||||
|
||||
// setContractNodeInfo rewrites local node info from the FrostFS network map.
|
||||
|
@ -1455,10 +1467,7 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
|||
|
||||
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
||||
return container.NewInfoProvider(func() (container.Source, error) {
|
||||
// threadsafe: called on init or on sighup when morph initialized
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(ctx, c)
|
||||
}
|
||||
c.initMorphComponents(ctx)
|
||||
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -10,10 +10,17 @@ type PutConfig struct {
|
|||
cfg *config.Config
|
||||
}
|
||||
|
||||
// GetConfig is a wrapper over "get" config section which provides access
|
||||
// to object get pipeline configuration of object service.
|
||||
type GetConfig struct {
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
const (
|
||||
subsection = "object"
|
||||
|
||||
putSubsection = "put"
|
||||
getSubsection = "get"
|
||||
|
||||
// PutPoolSizeDefault is a default value of routine pool size to
|
||||
// process object.Put requests in object service.
|
||||
|
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
|
|||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||
}
|
||||
|
||||
// Get returns structure that provides access to "get" subsection of
|
||||
// "object" section.
|
||||
func Get(c *config.Config) GetConfig {
|
||||
return GetConfig{
|
||||
c.Sub(subsection).Sub(getSubsection),
|
||||
}
|
||||
}
|
||||
|
||||
// Priority returns the value of "priority" config parameter.
|
||||
func (g GetConfig) Priority() []string {
|
||||
return config.StringSliceSafe(g.cfg, "priority")
|
||||
}
|
||||
|
|
|
@ -28,7 +28,12 @@ const (
|
|||
notaryDepositRetriesAmount = 300
|
||||
)
|
||||
|
||||
func initMorphComponents(ctx context.Context, c *cfg) {
|
||||
func (c *cfg) initMorphComponents(ctx context.Context) {
|
||||
c.cfgMorph.guard.Lock()
|
||||
defer c.cfgMorph.guard.Unlock()
|
||||
if c.cfgMorph.initialized {
|
||||
return
|
||||
}
|
||||
initMorphClient(ctx, c)
|
||||
|
||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||
|
@ -70,6 +75,7 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
|
||||
c.netMapSource = netmapSource
|
||||
c.cfgNetmap.wrapper = wrap
|
||||
c.cfgMorph.initialized = true
|
||||
}
|
||||
|
||||
func initMorphClient(ctx context.Context, c *cfg) {
|
||||
|
|
|
@ -143,9 +143,7 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
parseAttributes(c)
|
||||
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
||||
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(ctx, c)
|
||||
}
|
||||
c.initMorphComponents(ctx)
|
||||
|
||||
initNetmapState(c)
|
||||
|
||||
|
|
|
@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||
c.ObjectCfg.priorityMetrics)
|
||||
|
||||
*c.cfgObject.getSvc = *sGet // need smth better
|
||||
|
||||
|
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
|||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||
coreConstructor *cache.ClientCache,
|
||||
containerSource containercore.Source,
|
||||
priorityMetrics []placement.Metric,
|
||||
) *getsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
|
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
ls,
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.SuccessAfter(1),
|
||||
placement.WithPriorityMetrics(priorityMetrics),
|
||||
placement.WithNodeState(c),
|
||||
),
|
||||
coreConstructor,
|
||||
containerSource,
|
||||
|
|
|
@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
|||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||
|
||||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
|
|
|
@ -131,6 +131,9 @@
|
|||
"remote_pool_size": 100,
|
||||
"local_pool_size": 200,
|
||||
"skip_session_token_issuer_verification": true
|
||||
},
|
||||
"get": {
|
||||
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||
}
|
||||
},
|
||||
"storage": {
|
||||
|
|
|
@ -114,6 +114,10 @@ object:
|
|||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||
local_pool_size: 200 # number of async workers for local PUT operations
|
||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||
get:
|
||||
priority: # list of metrics of nodes for prioritization
|
||||
- $attribute:ClusterName
|
||||
- $attribute:UN-LOCODE
|
||||
|
||||
storage:
|
||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
|
|
|
@ -407,13 +407,17 @@ Contains object-service related parameters.
|
|||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
get:
|
||||
priority:
|
||||
- $attribute:ClusterName
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||
|
||||
# `runtime` section
|
||||
Contains runtime parameters.
|
||||
|
|
1
pkg/network/cache/multi.go
vendored
1
pkg/network/cache/multi.go
vendored
|
@ -70,6 +70,7 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
}
|
||||
|
||||
prmDial := client.PrmDial{
|
||||
|
|
|
@ -28,7 +28,7 @@ type executorSvc struct {
|
|||
type NodeState interface {
|
||||
// LocalNodeInfo must return current node state
|
||||
// in FrostFS API v2 NodeInfo structure.
|
||||
LocalNodeInfo() (*netmap.NodeInfo, error)
|
||||
LocalNodeInfo() *netmapSDK.NodeInfo
|
||||
|
||||
// ReadCurrentNetMap reads current local network map of the storage node
|
||||
// into the given parameter. Returns any error encountered which prevented
|
||||
|
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
|
|||
|
||||
func (s *executorSvc) LocalNodeInfo(
|
||||
_ context.Context,
|
||||
req *netmap.LocalNodeInfoRequest,
|
||||
_ *netmap.LocalNodeInfoRequest,
|
||||
) (*netmap.LocalNodeInfoResponse, error) {
|
||||
verV2 := req.GetMetaHeader().GetVersion()
|
||||
if verV2 == nil {
|
||||
return nil, errors.New("missing version")
|
||||
}
|
||||
|
||||
var ver versionsdk.Version
|
||||
if err := ver.ReadFromV2(*verV2); err != nil {
|
||||
return nil, fmt.Errorf("can't read version: %w", err)
|
||||
}
|
||||
|
||||
ni, err := s.state.LocalNodeInfo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
|
||||
ni2 := new(netmap.NodeInfo)
|
||||
ni2.SetPublicKey(ni.GetPublicKey())
|
||||
ni2.SetState(ni.GetState())
|
||||
ni2.SetAttributes(ni.GetAttributes())
|
||||
ni.IterateAddresses(func(s string) bool {
|
||||
ni2.SetAddresses(s)
|
||||
return true
|
||||
})
|
||||
|
||||
ni = ni2
|
||||
}
|
||||
ni := s.state.LocalNodeInfo()
|
||||
var nodeInfo netmap.NodeInfo
|
||||
ni.WriteToV2(&nodeInfo)
|
||||
|
||||
body := new(netmap.LocalNodeInfoResponseBody)
|
||||
body.SetVersion(&s.version)
|
||||
body.SetNodeInfo(ni)
|
||||
body.SetNodeInfo(&nodeInfo)
|
||||
|
||||
resp := new(netmap.LocalNodeInfoResponse)
|
||||
resp.SetBody(body)
|
||||
|
|
43
pkg/services/object_manager/placement/metrics.go
Normal file
43
pkg/services/object_manager/placement/metrics.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package placement
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
||||
const (
|
||||
attrPrefix = "$attribute:"
|
||||
)
|
||||
|
||||
type Metric interface {
|
||||
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
|
||||
}
|
||||
|
||||
func ParseMetric(raw string) (Metric, error) {
|
||||
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
|
||||
return NewAttributeMetric(attr), nil
|
||||
}
|
||||
return nil, errors.New("unsupported priority metric")
|
||||
}
|
||||
|
||||
// attributeMetric describes priority metric based on attribute.
|
||||
type attributeMetric struct {
|
||||
attribute string
|
||||
}
|
||||
|
||||
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
|
||||
// the value of attribute is the same. In other case return [1].
|
||||
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
|
||||
fromAttr := from.Attribute(am.attribute)
|
||||
toAttr := to.Attribute(am.attribute)
|
||||
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func NewAttributeMetric(attr string) Metric {
|
||||
return &attributeMetric{attribute: attr}
|
||||
}
|
|
@ -3,6 +3,7 @@ package placement
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
|
@ -23,6 +24,11 @@ type Builder interface {
|
|||
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||
}
|
||||
|
||||
type NodeState interface {
|
||||
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
|
||||
LocalNodeInfo() *netmap.NodeInfo
|
||||
}
|
||||
|
||||
// Option represents placement traverser option.
|
||||
type Option func(*cfg)
|
||||
|
||||
|
@ -50,6 +56,10 @@ type cfg struct {
|
|||
policy netmap.PlacementPolicy
|
||||
|
||||
builder Builder
|
||||
|
||||
metrics []Metric
|
||||
|
||||
nodeState NodeState
|
||||
}
|
||||
|
||||
const invalidOptsMsg = "invalid traverser options"
|
||||
|
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
|||
}
|
||||
|
||||
var rem []int
|
||||
if cfg.flatSuccess != nil {
|
||||
if len(cfg.metrics) > 0 && cfg.nodeState != nil {
|
||||
rem = defaultCopiesVector(cfg.policy)
|
||||
var unsortedVector []netmap.NodeInfo
|
||||
var regularVector []netmap.NodeInfo
|
||||
for i := range rem {
|
||||
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
|
||||
regularVector = append(regularVector, ns[i][rem[i]:]...)
|
||||
}
|
||||
rem = []int{-1, -1}
|
||||
|
||||
sortedVector, err := sortVector(cfg, unsortedVector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
||||
} else if cfg.flatSuccess != nil {
|
||||
ns = flatNodes(ns)
|
||||
rem = []int{int(*cfg.flatSuccess)}
|
||||
} else {
|
||||
|
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||
return [][]netmap.NodeInfo{flat}
|
||||
}
|
||||
|
||||
type nodeMetrics struct {
|
||||
index int
|
||||
metrics []int
|
||||
}
|
||||
|
||||
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
||||
nm := make([]nodeMetrics, len(unsortedVector))
|
||||
node := cfg.nodeState.LocalNodeInfo()
|
||||
|
||||
for i := range unsortedVector {
|
||||
m := make([]int, len(cfg.metrics))
|
||||
for j, pm := range cfg.metrics {
|
||||
m[j] = pm.CalculateValue(node, &unsortedVector[i])
|
||||
}
|
||||
nm[i] = nodeMetrics{
|
||||
index: i,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
slices.SortFunc(nm, func(a, b nodeMetrics) int {
|
||||
return slices.Compare(a.metrics, b.metrics)
|
||||
})
|
||||
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
|
||||
for i := range unsortedVector {
|
||||
sortedVector[i] = unsortedVector[nm[i].index]
|
||||
}
|
||||
return sortedVector, nil
|
||||
}
|
||||
|
||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||
type Node struct {
|
||||
addresses network.AddressGroup
|
||||
|
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
|
|||
c.copyNumbers = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithPriorityMetrics use provided priority metrics to sort nodes.
|
||||
func WithPriorityMetrics(m []Metric) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
||||
// WithNodeState provide state of the current node.
|
||||
func WithNodeState(s NodeState) Option {
|
||||
return func(c *cfg) {
|
||||
c.nodeState = s
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
|
|||
}
|
||||
|
||||
func testNode(v uint32) (n netmap.NodeInfo) {
|
||||
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
|
||||
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
|
||||
n.SetNetworkEndpoints(ip)
|
||||
n.SetPublicKey([]byte(ip))
|
||||
|
||||
return n
|
||||
}
|
||||
|
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||
return vc
|
||||
}
|
||||
|
||||
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||
func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||
return placement(ss, rs, nil)
|
||||
}
|
||||
|
||||
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||
return placement(ss, nil, ec)
|
||||
}
|
||||
|
||||
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||
nodes := make([][]netmap.NodeInfo, 0, len(rs))
|
||||
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
|
||||
num := uint32(0)
|
||||
|
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
|||
nodes = append(nodes, ns)
|
||||
|
||||
var rd netmap.ReplicaDescriptor
|
||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||
if len(rs) > 0 {
|
||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||
} else {
|
||||
rd.SetECDataCount(uint32(ec[i][0]))
|
||||
rd.SetECParityCount(uint32(ec[i][1]))
|
||||
}
|
||||
|
||||
replicas = append(replicas, rd)
|
||||
}
|
||||
|
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
|||
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, []Node{{addresses: n}}, tr.Next())
|
||||
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
|
||||
})
|
||||
|
||||
t.Run("put scenario", func(t *testing.T) {
|
||||
|
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
type nodeState struct {
|
||||
node *netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
|
||||
return n.node
|
||||
}
|
||||
|
||||
func TestTraverserPriorityMetrics(t *testing.T) {
|
||||
t.Run("one rep one metric", func(t *testing.T) {
|
||||
selectors := []int{4}
|
||||
replicas := []int{3}
|
||||
|
||||
nodes, cnr := testPlacement(selectors, replicas)
|
||||
|
||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||
|
||||
sdkNode := testNode(5)
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
|
||||
nodesCopy := copyVectors(nodes)
|
||||
|
||||
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||
|
||||
tr, err := NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Without priority metric `ClusterName` the order will be:
|
||||
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||
// With priority metric `ClusterName` and current node in cluster B
|
||||
// the order should be:
|
||||
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||
next := tr.Next()
|
||||
require.NotNil(t, next)
|
||||
require.Equal(t, 3, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
// The last node is
|
||||
require.Equal(t, 1, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
})
|
||||
|
||||
t.Run("two reps two metrics", func(t *testing.T) {
|
||||
selectors := []int{3, 3}
|
||||
replicas := []int{2, 2}
|
||||
|
||||
nodes, cnr := testPlacement(selectors, replicas)
|
||||
|
||||
// REPLICA #1
|
||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
|
||||
|
||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
|
||||
|
||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||
nodes[0][2].SetAttribute("ClusterName", "A")
|
||||
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
|
||||
|
||||
// REPLICA #2
|
||||
// Node_3 ip4/0.0.0.0/tcp/3
|
||||
nodes[1][0].SetAttribute("ClusterName", "B")
|
||||
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
|
||||
|
||||
// Node_4, PK - ip4/0.0.0.0/tcp/4
|
||||
nodes[1][1].SetAttribute("ClusterName", "B")
|
||||
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
|
||||
|
||||
// Node_5, PK - ip4/0.0.0.0/tcp/5
|
||||
nodes[1][2].SetAttribute("ClusterName", "B")
|
||||
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
|
||||
|
||||
sdkNode := testNode(9)
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
|
||||
|
||||
nodesCopy := copyVectors(nodes)
|
||||
|
||||
m := []Metric{
|
||||
NewAttributeMetric("ClusterName"),
|
||||
NewAttributeMetric("UN-LOCODE"),
|
||||
}
|
||||
|
||||
tr, err := NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that nodes in the same cluster and
|
||||
// in the same location should be the first in slice.
|
||||
// Nodes which are follow criteria but stay outside the replica
|
||||
// should be in the next slice.
|
||||
|
||||
next := tr.Next()
|
||||
require.Equal(t, 4, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 2, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
|
||||
|
||||
nodesCopy = copyVectors(nodes)
|
||||
|
||||
tr, err = NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 4, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 2, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
|
||||
sdkNode.SetAttribute("ClusterName", "A")
|
||||
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
|
||||
|
||||
nodesCopy = copyVectors(nodes)
|
||||
|
||||
tr, err = NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 4, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 2, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
})
|
||||
|
||||
t.Run("ec container", func(t *testing.T) {
|
||||
selectors := []int{4}
|
||||
ec := [][]int{{2, 1}}
|
||||
|
||||
nodes, cnr := testECPlacement(selectors, ec)
|
||||
|
||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||
|
||||
sdkNode := testNode(5)
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
|
||||
nodesCopy := copyVectors(nodes)
|
||||
|
||||
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||
|
||||
tr, err := NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Without priority metric `ClusterName` the order will be:
|
||||
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||
// With priority metric `ClusterName` and current node in cluster B
|
||||
// the order should be:
|
||||
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||
next := tr.Next()
|
||||
require.NotNil(t, next)
|
||||
require.Equal(t, 3, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
// The last node is
|
||||
require.Equal(t, 1, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
}
|
||||
|
||||
if !netAddr.IsTLSEnabled() {
|
||||
|
|
|
@ -342,7 +342,9 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing_grpc.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
)
|
||||
}
|
||||
|
||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||
|
|
Loading…
Reference in a new issue