From 7410827db8354e296078f8be9c8d0ff26675bc47 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 18 Jul 2022 16:41:35 +0300 Subject: [PATCH] [#1609] config: Allow to prioritize N3 endpoints Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 6 ++ .../internal/modules/storagecfg/config.go | 2 +- cmd/neofs-node/config/morph/config.go | 26 +++++-- cmd/neofs-node/config/morph/config_test.go | 7 +- cmd/neofs-node/morph.go | 9 ++- config/example/ir.env | 3 +- config/example/ir.yaml | 4 +- config/example/node.env | 5 +- config/example/node.json | 10 ++- config/example/node.yaml | 6 +- docs/storage-node-configuration.md | 22 ++++-- pkg/innerring/innerring.go | 19 +++++- pkg/morph/client/audit/result_test.go | 2 +- pkg/morph/client/constructor.go | 20 +++--- pkg/morph/client/multi.go | 67 +++++++------------ 15 files changed, 123 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08b81de13..0e930e114 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Changelog for NeoFS Node - Require SG members to be unique (#1490) - `neofs-cli` now doesn't remove container with LOCK objects without `--force` flag (#1500) +- `morph` sections in IR and storage node configuration now accept an address and a priority of an endpoint (#1609) ### Fixed @@ -19,6 +20,11 @@ Changelog for NeoFS Node ### Updated +### Updating from v0.29.0 +Change morph endpoints from simple string to a pair of `address` and `priority`. The second can be omitted. +For inner ring node this resides in `morph.endpoint.client` section, +for storage node -- in `morph.rpc_endpoint` section. See `config/example` for an example. + ## [0.29.0] - 2022-07-07 Support WalletConnect signature scheme. diff --git a/cmd/neofs-adm/internal/modules/storagecfg/config.go b/cmd/neofs-adm/internal/modules/storagecfg/config.go index 9bdd87464..a2ddad333 100644 --- a/cmd/neofs-adm/internal/modules/storagecfg/config.go +++ b/cmd/neofs-adm/internal/modules/storagecfg/config.go @@ -40,7 +40,7 @@ morph: disable_cache: false # use TTL cache for side chain GET operations rpc_endpoint: # side chain N3 RPC endpoints {{- range .MorphRPC }} - - wss://{{.}}/ws{{end}} + - address: wss://{{.}}/ws{{end}} {{if not .Relay }} storage: shard_pool_size: 15 # size of per-shard worker pools used for PUT operations diff --git a/cmd/neofs-node/config/morph/config.go b/cmd/neofs-node/config/morph/config.go index 481b7791a..63774aba4 100644 --- a/cmd/neofs-node/config/morph/config.go +++ b/cmd/neofs-node/config/morph/config.go @@ -2,9 +2,11 @@ package morphconfig import ( "fmt" + "strconv" "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" ) const ( @@ -28,13 +30,27 @@ const ( // from "morph" section. // // Throws panic if list is empty. -func RPCEndpoint(c *config.Config) []string { - v := config.StringSliceSafe(c.Sub(subsection), "rpc_endpoint") - if len(v) == 0 { - panic(fmt.Errorf("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")) +func RPCEndpoint(c *config.Config) []client.Endpoint { + var es []client.Endpoint + + sub := c.Sub(subsection).Sub("rpc_endpoint") + for i := 0; ; i++ { + s := sub.Sub(strconv.FormatInt(int64(i), 10)) + addr := config.StringSafe(s, "address") + if addr == "" { + break + } + + es = append(es, client.Endpoint{ + Address: addr, + Priority: int(config.IntSafe(s, "priority")), + }) } - return v + if len(es) == 0 { + panic(fmt.Errorf("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")) + } + return es } // DialTimeout returns the value of "dial_timeout" config parameter diff --git a/cmd/neofs-node/config/morph/config_test.go b/cmd/neofs-node/config/morph/config_test.go index 204fc7a5d..5fc82c9e1 100644 --- a/cmd/neofs-node/config/morph/config_test.go +++ b/cmd/neofs-node/config/morph/config_test.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" morphconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/morph" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/stretchr/testify/require" ) @@ -22,9 +23,9 @@ func TestMorphSection(t *testing.T) { const path = "../../../../config/example/node" var ( - rpcs = []string{ - "wss://rpc1.morph.fs.neo.org:40341/ws", - "wss://rpc2.morph.fs.neo.org:40341/ws", + rpcs = []client.Endpoint{ + {"wss://rpc1.morph.fs.neo.org:40341/ws", 2}, + {"wss://rpc2.morph.fs.neo.org:40341/ws", 1}, } ) diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index befd3a006..ce2690993 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -35,18 +35,17 @@ func initMorphComponents(c *cfg) { var err error addresses := morphconfig.RPCEndpoint(c.appCfg) - if len(addresses) == 0 { - fatalOnErr(errors.New("missing Neo RPC endpoints")) - } + // Morph client stable-sorts endpoints by priority. Shuffle here to randomize + // order of endpoints with the same priority. rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) - cli, err := client.New(c.key, addresses[0], + cli, err := client.New(c.key, client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithLogger(c.log), - client.WithExtraEndpoints(addresses[1:]), + client.WithEndpoints(addresses...), client.WithConnLostCallback(func() { c.internalErr <- errors.New("morph connection has been lost") }), diff --git a/config/example/ir.env b/config/example/ir.env index 524db837c..71fb11182 100644 --- a/config/example/ir.env +++ b/config/example/ir.env @@ -7,7 +7,8 @@ NEOFS_IR_WALLET_PASSWORD=secret NEOFS_IR_WITHOUT_MAINNET=false NEOFS_IR_MORPH_DIAL_TIMEOUT=5s -NEOFS_IR_MORPH_ENDPOINT_CLIENT="wss://sidechain1.fs.neo.org:30333/ws wss://sidechain2.fs.neo.org:30333/ws" +NEOFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws" +NEOFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws" NEOFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170" NEOFS_IR_MAINNET_DIAL_TIMEOUT=5s diff --git a/config/example/ir.yaml b/config/example/ir.yaml index 44b91a513..2c765dbdc 100644 --- a/config/example/ir.yaml +++ b/config/example/ir.yaml @@ -14,8 +14,8 @@ morph: dial_timeout: 5s # Timeout for RPC client connection to sidechain endpoint: client: # List of websocket RPC endpoints in sidechain - - wss://sidechain1.fs.neo.org:30333/ws - - wss://sidechain2.fs.neo.org:30333/ws + - address: wss://sidechain1.fs.neo.org:30333/ws + - address: wss://sidechain2.fs.neo.org:30333/ws validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup - 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170 diff --git a/config/example/node.env b/config/example/node.env index 1c5c3c99d..d9a2cd513 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -56,7 +56,10 @@ NEOFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f # Morph chain section NEOFS_MORPH_DIAL_TIMEOUT=30s NEOFS_MORPH_DISABLE_CACHE=true -NEOFS_MORPH_RPC_ENDPOINT="wss://rpc1.morph.fs.neo.org:40341/ws wss://rpc2.morph.fs.neo.org:40341/ws" +NEOFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.fs.neo.org:40341/ws" +NEOFS_MORPH_RPC_ENDPOINT_0_PRIORITY=2 +NEOFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.fs.neo.org:40341/ws" +NEOFS_MORPH_RPC_ENDPOINT_1_PRIORITY=1 # API Client section NEOFS_APICLIENT_DIAL_TIMEOUT=15s diff --git a/config/example/node.json b/config/example/node.json index 411b16666..ed0ffbb5d 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -95,8 +95,14 @@ "dial_timeout": "30s", "disable_cache": true, "rpc_endpoint": [ - "wss://rpc1.morph.fs.neo.org:40341/ws", - "wss://rpc2.morph.fs.neo.org:40341/ws" + { + "address": "wss://rpc1.morph.fs.neo.org:40341/ws", + "priority": 2 + }, + { + "address": "wss://rpc2.morph.fs.neo.org:40341/ws", + "priority": 1 + } ] }, "apiclient": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 79421cc46..88c6ac475 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -82,8 +82,10 @@ morph: dial_timeout: 30s # timeout for side chain NEO RPC client connection disable_cache: true # do not use TTL cache for side chain GET operations rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success - - wss://rpc1.morph.fs.neo.org:40341/ws - - wss://rpc2.morph.fs.neo.org:40341/ws + - address: wss://rpc1.morph.fs.neo.org:40341/ws + priority: 2 + - address: wss://rpc2.morph.fs.neo.org:40341/ws + priority: 1 apiclient: dial_timeout: 15s # timeout for NEOFS API client connection diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index aec70acbc..57f28b189 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -133,15 +133,23 @@ morph: dial_timeout: 30s disable_cache: true rpc_endpoint: - - wss://rpc1.morph.fs.neo.org:40341/ws - - wss://rpc2.morph.fs.neo.org:40341/ws + - address: wss://rpc1.morph.fs.neo.org:40341/ws + priority: 2 + - address: wss://rpc2.morph.fs.neo.org:40341/ws + priority: 1 ``` -| Parameter | Type | Default value | Description | -|-----------------|------------|---------------|---------------------------------------------------------------------------------------------------------------------------------| -| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. | -| `disable_cache` | `bool` | `false` | Flag to disable TTL cache for some side-chain operations.
NOTE: Setting this to `true` can slow down the node considerably. | -| `rpc_endpoint` | `[]string` | | Array of _websocket_ N3 endpoints. | +| Parameter | Type | Default value | Description | +|-----------------|-----------------------------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------| +| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. | +| `disable_cache` | `bool` | `false` | Flag to disable TTL cache for some side-chain operations.
NOTE: Setting this to `true` can slow down the node considerably. | +| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. | + +## `rpc_endpoint` subsection +| Parameter | Type | Default value | Description | +|------------|----------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------| +| `address` | `string` | | _WebSocket_ N3 endpoint. | +| `priority` | `int` | `0` | Priority of an endpoint. Endpoint with a higher priority has more chance of being used. Endpoints with equal priority are iterated over randomly. | # `storage` section diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 85ff3d5b4..5e4f20567 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -956,19 +956,32 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { // config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients" - endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.client") + var endpoints []client.Endpoint + + section := p.name + ".endpoint.client" + for i := 0; ; i++ { + addr := p.cfg.GetString(section + ".address") + if addr == "" { + break + } + + endpoints = append(endpoints, client.Endpoint{ + Address: addr, + Priority: p.cfg.GetInt(section + ".priority"), + }) + } + if len(endpoints) == 0 { return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) } return client.New( p.key, - endpoints[0], client.WithContext(ctx), client.WithLogger(p.log), client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")), client.WithSigner(p.sgn), - client.WithExtraEndpoints(endpoints[1:]), + client.WithEndpoints(endpoints...), client.WithConnLostCallback(func() { errChan <- fmt.Errorf("%s chain connection has been lost", p.name) }), diff --git a/pkg/morph/client/audit/result_test.go b/pkg/morph/client/audit/result_test.go index 01f22a0e7..006bfba77 100644 --- a/pkg/morph/client/audit/result_test.go +++ b/pkg/morph/client/audit/result_test.go @@ -26,7 +26,7 @@ func TestAuditResults(t *testing.T) { auditHash, err := util.Uint160DecodeStringLE(sAuditHash) require.NoError(t, err) - morphClient, err := client.New(key, endpoint) + morphClient, err := client.New(key, client.WithEndpoints(client.Endpoint{Address: endpoint})) require.NoError(t, err) auditClientWrapper, err := NewFromMorph(morphClient, auditHash, 0) diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 3d9151a98..5820e24ef 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -2,6 +2,7 @@ package client import ( "context" + "errors" "fmt" "sync" "time" @@ -35,7 +36,7 @@ type cfg struct { signer *transaction.Signer - extraEndpoints []string + endpoints []Endpoint singleCli *client.WSClient // neo-go client for single client mode @@ -76,7 +77,7 @@ func defaultConfig() *cfg { // If desired option satisfies the default value, it can be omitted. // If multiple options of the same config value are supplied, // the option with the highest index in the arguments will be used. -func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) { +func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { if key == nil { panic("empty private key") } @@ -89,6 +90,10 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) opt(cfg) } + if len(cfg.endpoints) == 0 { + return nil, errors.New("no endpoints were provided") + } + cli := &Client{ cache: newClientCache(), logger: cfg.logger, @@ -111,9 +116,8 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) // they will be used in switch process, otherwise // inactive mode will be enabled cli.client = cfg.singleCli - cli.endpoints.init(cfg.extraEndpoints) } else { - ws, err := newWSClient(*cfg, endpoint) + ws, err := newWSClient(*cfg, cfg.endpoints[0].Address) if err != nil { return nil, fmt.Errorf("could not create morph client: %w", err) } @@ -124,8 +128,8 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) } cli.client = ws - cli.endpoints.init(append([]string{endpoint}, cfg.extraEndpoints...)) } + cli.endpoints.init(cfg.endpoints) go cli.notificationLoop() @@ -206,11 +210,11 @@ func WithSigner(signer *transaction.Signer) Option { } } -// WithExtraEndpoints returns a client constructor option +// WithEndpoints returns a client constructor option // that specifies additional Neo rpc endpoints. -func WithExtraEndpoints(endpoints []string) Option { +func WithEndpoints(endpoints ...Endpoint) Option { return func(c *cfg) { - c.extraEndpoints = append(c.extraEndpoints, endpoints...) + c.endpoints = append(c.endpoints, endpoints...) } } diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index e35ed3c4c..e72a3915b 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -1,69 +1,46 @@ package client import ( + "sort" + "go.uber.org/zap" ) +// Endpoint represents morph endpoint together with its priority. +type Endpoint struct { + Address string + Priority int +} + type endpoints struct { curr int - list []string + list []Endpoint } -func (e *endpoints) init(ee []string) { +func (e *endpoints) init(ee []Endpoint) { + sort.SliceStable(ee, func(i, j int) bool { + return ee[i].Priority > ee[j].Priority + }) + e.curr = 0 e.list = ee } -// next returns the next endpoint and its index -// to try to connect to. -// Returns -1 index if there is no known RPC endpoints. -func (e *endpoints) next() (string, int) { - if len(e.list) == 0 { - return "", -1 - } - - next := e.curr + 1 - if next == len(e.list) { - next = 0 - } - - e.curr = next - - return e.list[next], next -} - -// current returns an endpoint and its index the Client -// is connected to. -// Returns -1 index if there is no known RPC endpoints -func (e *endpoints) current() (string, int) { - if len(e.list) == 0 { - return "", -1 - } - - return e.list[e.curr], e.curr -} - func (c *Client) switchRPC() bool { c.switchLock.Lock() defer c.switchLock.Unlock() c.client.Close() - _, currEndpointIndex := c.endpoints.current() - if currEndpointIndex == -1 { - // there are no known RPC endpoints to try - // to connect to => do not switch - return false - } - - for { - newEndpoint, index := c.endpoints.next() - if index == currEndpointIndex { - // all the endpoint have been tried - // for connection unsuccessfully - return false + // Iterate endpoints in the order of decreasing priority. + // Skip the current endpoint. + last := c.endpoints.curr + for c.endpoints.curr = range c.endpoints.list { + if c.endpoints.curr == last { + continue } + newEndpoint := c.endpoints.list[c.endpoints.curr].Address cli, err := newWSClient(c.cfg, newEndpoint) if err != nil { c.logger.Warn("could not establish connection to the switched RPC node", @@ -103,6 +80,8 @@ func (c *Client) switchRPC() bool { return true } + + return false } func (c *Client) notificationLoop() {