From 63ebe41991fa05ab069c35bdacec9706fe96f20d Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 12 Jan 2021 16:25:30 +0300 Subject: [PATCH] [#304] cmd/neofs-node: Select random NEO endpoints from the list Application should support several NEO endpoints so it can switch between different RPC nodes when they fail. Application iterates over endpoints in random order so the default list of endpoints distribute workload kinda uniformly. Signed-off-by: Alex Vanin --- cmd/neofs-node/config.go | 4 +-- cmd/neofs-node/morph.go | 71 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index bfe5a9594..2ebfbc943 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -365,8 +365,8 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgNodeKey, "Kwk6k2eC3L3QuPvD8aiaNyoSXgQ2YL1bwS5CP1oKoA9waeAze97s") v.SetDefault(cfgBootstrapAddress, "") // address to bootstrap with - v.SetDefault(cfgMorphRPCAddress, "http://morph_chain.localtest.nspcc.ru:30333/") - v.SetDefault(cfgMorphNotifyRPCAddress, "ws://morph_chain:30333/ws") + v.SetDefault(cfgMorphRPCAddress, []string{}) + v.SetDefault(cfgMorphNotifyRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second) v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index a34661975..a0197645d 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "github.com/nspcc-dev/neo-go/pkg/util" @@ -11,14 +12,44 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" + "github.com/nspcc-dev/neofs-node/pkg/util/rand" + "go.uber.org/zap" ) const newEpochNotification = "NewEpoch" +var ( + errNoRPCEndpoints = errors.New("NEO RPC endpoints are not specified in config") + errNoWSEndpoints = errors.New("websocket NEO listener endpoints are not specified in config") +) + func initMorphComponents(c *cfg) { var err error - c.cfgMorph.client, err = client.New(c.key, c.viper.GetString(cfgMorphRPCAddress)) + addresses := c.viper.GetStringSlice(cfgMorphRPCAddress) + if len(addresses) == 0 { + fatalOnErr(errNoRPCEndpoints) + } + + crand := rand.New() // math/rand with cryptographic source + crand.Shuffle(len(addresses), func(i, j int) { + addresses[i], addresses[j] = addresses[j], addresses[i] + }) + + for i := range addresses { + c.cfgMorph.client, err = client.New(c.key, addresses[i]) + if err == nil { + c.log.Info("neo RPC connection established", + zap.String("endpoint", addresses[i])) + + break + } + + c.log.Info("failed to establish neo RPC connection, trying another", + zap.String("endpoint", addresses[i]), + zap.String("error", err.Error())) + } + fatalOnErr(err) staticClient, err := client.NewStatic( @@ -39,11 +70,41 @@ func initMorphComponents(c *cfg) { } func listenMorphNotifications(c *cfg) { - subs, err := subscriber.New(c.ctx, &subscriber.Params{ - Log: c.log, - Endpoint: c.viper.GetString(cfgMorphNotifyRPCAddress), - DialTimeout: c.viper.GetDuration(cfgMorphNotifyDialTimeout), + var ( + err error + subs subscriber.Subscriber + ) + + endpoints := c.viper.GetStringSlice(cfgMorphNotifyRPCAddress) + if len(endpoints) == 0 { + fatalOnErr(errNoWSEndpoints) + } + + timeout := c.viper.GetDuration(cfgMorphNotifyDialTimeout) + + crand := rand.New() // math/rand with cryptographic source + crand.Shuffle(len(endpoints), func(i, j int) { + endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) + + for i := range endpoints { + subs, err = subscriber.New(c.ctx, &subscriber.Params{ + Log: c.log, + Endpoint: endpoints[i], + DialTimeout: timeout, + }) + if err == nil { + c.log.Info("websocket neo event listener established", + zap.String("endpoint", endpoints[i])) + + break + } + + c.log.Info("failed to establish websocket neo event listener, trying another", + zap.String("endpoint", endpoints[i]), + zap.String("error", err.Error())) + } + fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{