[#1609] config: Allow to prioritize N3 endpoints

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-07-18 16:41:35 +03:00 committed by fyrchik
parent aed83d1660
commit 7410827db8
15 changed files with 123 additions and 85 deletions

View file

@ -12,6 +12,7 @@ Changelog for NeoFS Node
- Require SG members to be unique (#1490)
- `neofs-cli` now doesn't remove container with LOCK objects without `--force` flag (#1500)
- `morph` sections in IR and storage node configuration now accept an address and a priority of an endpoint (#1609)
### Fixed
@ -19,6 +20,11 @@ Changelog for NeoFS Node
### Updated
### Updating from v0.29.0
Change morph endpoints from simple string to a pair of `address` and `priority`. The second can be omitted.
For inner ring node this resides in `morph.endpoint.client` section,
for storage node -- in `morph.rpc_endpoint` section. See `config/example` for an example.
## [0.29.0] - 2022-07-07
Support WalletConnect signature scheme.

View file

@ -40,7 +40,7 @@ morph:
disable_cache: false # use TTL cache for side chain GET operations
rpc_endpoint: # side chain N3 RPC endpoints
{{- range .MorphRPC }}
- wss://{{.}}/ws{{end}}
- address: wss://{{.}}/ws{{end}}
{{if not .Relay }}
storage:
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations

View file

@ -2,9 +2,11 @@ package morphconfig
import (
"fmt"
"strconv"
"time"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
)
const (
@ -28,13 +30,27 @@ const (
// from "morph" section.
//
// Throws panic if list is empty.
func RPCEndpoint(c *config.Config) []string {
v := config.StringSliceSafe(c.Sub(subsection), "rpc_endpoint")
if len(v) == 0 {
panic(fmt.Errorf("no morph chain RPC endpoints, see `morph.rpc_endpoint` section"))
func RPCEndpoint(c *config.Config) []client.Endpoint {
var es []client.Endpoint
sub := c.Sub(subsection).Sub("rpc_endpoint")
for i := 0; ; i++ {
s := sub.Sub(strconv.FormatInt(int64(i), 10))
addr := config.StringSafe(s, "address")
if addr == "" {
break
}
return v
es = append(es, client.Endpoint{
Address: addr,
Priority: int(config.IntSafe(s, "priority")),
})
}
if len(es) == 0 {
panic(fmt.Errorf("no morph chain RPC endpoints, see `morph.rpc_endpoint` section"))
}
return es
}
// DialTimeout returns the value of "dial_timeout" config parameter

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
morphconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/morph"
configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/stretchr/testify/require"
)
@ -22,9 +23,9 @@ func TestMorphSection(t *testing.T) {
const path = "../../../../config/example/node"
var (
rpcs = []string{
"wss://rpc1.morph.fs.neo.org:40341/ws",
"wss://rpc2.morph.fs.neo.org:40341/ws",
rpcs = []client.Endpoint{
{"wss://rpc1.morph.fs.neo.org:40341/ws", 2},
{"wss://rpc2.morph.fs.neo.org:40341/ws", 1},
}
)

View file

@ -35,18 +35,17 @@ func initMorphComponents(c *cfg) {
var err error
addresses := morphconfig.RPCEndpoint(c.appCfg)
if len(addresses) == 0 {
fatalOnErr(errors.New("missing Neo RPC endpoints"))
}
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
// order of endpoints with the same priority.
rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
cli, err := client.New(c.key, addresses[0],
cli, err := client.New(c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithExtraEndpoints(addresses[1:]),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),

View file

@ -7,7 +7,8 @@ NEOFS_IR_WALLET_PASSWORD=secret
NEOFS_IR_WITHOUT_MAINNET=false
NEOFS_IR_MORPH_DIAL_TIMEOUT=5s
NEOFS_IR_MORPH_ENDPOINT_CLIENT="wss://sidechain1.fs.neo.org:30333/ws wss://sidechain2.fs.neo.org:30333/ws"
NEOFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws"
NEOFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws"
NEOFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170"
NEOFS_IR_MAINNET_DIAL_TIMEOUT=5s

View file

@ -14,8 +14,8 @@ morph:
dial_timeout: 5s # Timeout for RPC client connection to sidechain
endpoint:
client: # List of websocket RPC endpoints in sidechain
- wss://sidechain1.fs.neo.org:30333/ws
- wss://sidechain2.fs.neo.org:30333/ws
- address: wss://sidechain1.fs.neo.org:30333/ws
- address: wss://sidechain2.fs.neo.org:30333/ws
validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup
- 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170

View file

@ -56,7 +56,10 @@ NEOFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f
# Morph chain section
NEOFS_MORPH_DIAL_TIMEOUT=30s
NEOFS_MORPH_DISABLE_CACHE=true
NEOFS_MORPH_RPC_ENDPOINT="wss://rpc1.morph.fs.neo.org:40341/ws wss://rpc2.morph.fs.neo.org:40341/ws"
NEOFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.fs.neo.org:40341/ws"
NEOFS_MORPH_RPC_ENDPOINT_0_PRIORITY=2
NEOFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.fs.neo.org:40341/ws"
NEOFS_MORPH_RPC_ENDPOINT_1_PRIORITY=1
# API Client section
NEOFS_APICLIENT_DIAL_TIMEOUT=15s

View file

@ -95,8 +95,14 @@
"dial_timeout": "30s",
"disable_cache": true,
"rpc_endpoint": [
"wss://rpc1.morph.fs.neo.org:40341/ws",
"wss://rpc2.morph.fs.neo.org:40341/ws"
{
"address": "wss://rpc1.morph.fs.neo.org:40341/ws",
"priority": 2
},
{
"address": "wss://rpc2.morph.fs.neo.org:40341/ws",
"priority": 1
}
]
},
"apiclient": {

View file

@ -82,8 +82,10 @@ morph:
dial_timeout: 30s # timeout for side chain NEO RPC client connection
disable_cache: true # do not use TTL cache for side chain GET operations
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
- wss://rpc1.morph.fs.neo.org:40341/ws
- wss://rpc2.morph.fs.neo.org:40341/ws
- address: wss://rpc1.morph.fs.neo.org:40341/ws
priority: 2
- address: wss://rpc2.morph.fs.neo.org:40341/ws
priority: 1
apiclient:
dial_timeout: 15s # timeout for NEOFS API client connection

View file

@ -133,15 +133,23 @@ morph:
dial_timeout: 30s
disable_cache: true
rpc_endpoint:
- wss://rpc1.morph.fs.neo.org:40341/ws
- wss://rpc2.morph.fs.neo.org:40341/ws
- address: wss://rpc1.morph.fs.neo.org:40341/ws
priority: 2
- address: wss://rpc2.morph.fs.neo.org:40341/ws
priority: 1
```
| Parameter | Type | Default value | Description |
|-----------------|------------|---------------|---------------------------------------------------------------------------------------------------------------------------------|
|-----------------|-----------------------------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------|
| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. |
| `disable_cache` | `bool` | `false` | Flag to disable TTL cache for some side-chain operations.<br/>NOTE: Setting this to `true` can slow down the node considerably. |
| `rpc_endpoint` | `[]string` | | Array of _websocket_ N3 endpoints. |
| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. |
## `rpc_endpoint` subsection
| Parameter | Type | Default value | Description |
|------------|----------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
| `address` | `string` | | _WebSocket_ N3 endpoint. |
| `priority` | `int` | `0` | Priority of an endpoint. Endpoint with a higher priority has more chance of being used. Endpoints with equal priority are iterated over randomly. |
# `storage` section

View file

@ -956,19 +956,32 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev
func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) {
// config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients"
endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.client")
var endpoints []client.Endpoint
section := p.name + ".endpoint.client"
for i := 0; ; i++ {
addr := p.cfg.GetString(section + ".address")
if addr == "" {
break
}
endpoints = append(endpoints, client.Endpoint{
Address: addr,
Priority: p.cfg.GetInt(section + ".priority"),
})
}
if len(endpoints) == 0 {
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
}
return client.New(
p.key,
endpoints[0],
client.WithContext(ctx),
client.WithLogger(p.log),
client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")),
client.WithSigner(p.sgn),
client.WithExtraEndpoints(endpoints[1:]),
client.WithEndpoints(endpoints...),
client.WithConnLostCallback(func() {
errChan <- fmt.Errorf("%s chain connection has been lost", p.name)
}),

View file

@ -26,7 +26,7 @@ func TestAuditResults(t *testing.T) {
auditHash, err := util.Uint160DecodeStringLE(sAuditHash)
require.NoError(t, err)
morphClient, err := client.New(key, endpoint)
morphClient, err := client.New(key, client.WithEndpoints(client.Endpoint{Address: endpoint}))
require.NoError(t, err)
auditClientWrapper, err := NewFromMorph(morphClient, auditHash, 0)

View file

@ -2,6 +2,7 @@ package client
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -35,7 +36,7 @@ type cfg struct {
signer *transaction.Signer
extraEndpoints []string
endpoints []Endpoint
singleCli *client.WSClient // neo-go client for single client mode
@ -76,7 +77,7 @@ func defaultConfig() *cfg {
// If desired option satisfies the default value, it can be omitted.
// If multiple options of the same config value are supplied,
// the option with the highest index in the arguments will be used.
func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) {
func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
if key == nil {
panic("empty private key")
}
@ -89,6 +90,10 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
opt(cfg)
}
if len(cfg.endpoints) == 0 {
return nil, errors.New("no endpoints were provided")
}
cli := &Client{
cache: newClientCache(),
logger: cfg.logger,
@ -111,9 +116,8 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
// they will be used in switch process, otherwise
// inactive mode will be enabled
cli.client = cfg.singleCli
cli.endpoints.init(cfg.extraEndpoints)
} else {
ws, err := newWSClient(*cfg, endpoint)
ws, err := newWSClient(*cfg, cfg.endpoints[0].Address)
if err != nil {
return nil, fmt.Errorf("could not create morph client: %w", err)
}
@ -124,8 +128,8 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
}
cli.client = ws
cli.endpoints.init(append([]string{endpoint}, cfg.extraEndpoints...))
}
cli.endpoints.init(cfg.endpoints)
go cli.notificationLoop()
@ -206,11 +210,11 @@ func WithSigner(signer *transaction.Signer) Option {
}
}
// WithExtraEndpoints returns a client constructor option
// WithEndpoints returns a client constructor option
// that specifies additional Neo rpc endpoints.
func WithExtraEndpoints(endpoints []string) Option {
func WithEndpoints(endpoints ...Endpoint) Option {
return func(c *cfg) {
c.extraEndpoints = append(c.extraEndpoints, endpoints...)
c.endpoints = append(c.endpoints, endpoints...)
}
}

View file

@ -1,69 +1,46 @@
package client
import (
"sort"
"go.uber.org/zap"
)
// Endpoint represents morph endpoint together with its priority.
type Endpoint struct {
Address string
Priority int
}
type endpoints struct {
curr int
list []string
list []Endpoint
}
func (e *endpoints) init(ee []string) {
func (e *endpoints) init(ee []Endpoint) {
sort.SliceStable(ee, func(i, j int) bool {
return ee[i].Priority > ee[j].Priority
})
e.curr = 0
e.list = ee
}
// next returns the next endpoint and its index
// to try to connect to.
// Returns -1 index if there is no known RPC endpoints.
func (e *endpoints) next() (string, int) {
if len(e.list) == 0 {
return "", -1
}
next := e.curr + 1
if next == len(e.list) {
next = 0
}
e.curr = next
return e.list[next], next
}
// current returns an endpoint and its index the Client
// is connected to.
// Returns -1 index if there is no known RPC endpoints
func (e *endpoints) current() (string, int) {
if len(e.list) == 0 {
return "", -1
}
return e.list[e.curr], e.curr
}
func (c *Client) switchRPC() bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()
c.client.Close()
_, currEndpointIndex := c.endpoints.current()
if currEndpointIndex == -1 {
// there are no known RPC endpoints to try
// to connect to => do not switch
return false
}
for {
newEndpoint, index := c.endpoints.next()
if index == currEndpointIndex {
// all the endpoint have been tried
// for connection unsuccessfully
return false
// Iterate endpoints in the order of decreasing priority.
// Skip the current endpoint.
last := c.endpoints.curr
for c.endpoints.curr = range c.endpoints.list {
if c.endpoints.curr == last {
continue
}
newEndpoint := c.endpoints.list[c.endpoints.curr].Address
cli, err := newWSClient(c.cfg, newEndpoint)
if err != nil {
c.logger.Warn("could not establish connection to the switched RPC node",
@ -103,6 +80,8 @@ func (c *Client) switchRPC() bool {
return true
}
return false
}
func (c *Client) notificationLoop() {