From ad7ad12a0c0433a37ca7ae6131df1db8d6ce0b44 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 26 Aug 2021 10:59:02 +0300 Subject: [PATCH] [#746] morph: Implement and use multi-client There is a need to work with a set of Neo RPC nodes in order not to depend on the failure of some nodes while others are active. Support "multi-client" mode of morph `Client` entity. If instance is not "multi-client", it works as before. Constructor `New` creates multi-client, and each method performs iterating over the fixed set of endpoints until success. Opened client connections are cached (without eviction for now). Storage (as earlier) and IR (from now) nodes can be configured with multiple Neo endpoints. As above, `New` creates multi-client instance, so we don't need initialization changes on app-side. `Wait` and `GetDesignateHash` methods of `Client` return an error from now to detect connection errors. `NotaryEnabled` method is removed as unused. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/morph.go | 26 ++-- pkg/innerring/innerring.go | 9 +- .../processors/governance/processor.go | 14 +- pkg/morph/client/client.go | 131 +++++++++++++++--- pkg/morph/client/constructor.go | 55 +++----- pkg/morph/client/multi.go | 105 ++++++++++++++ pkg/morph/client/notary.go | 94 ++++++++++--- 7 files changed, 345 insertions(+), 89 deletions(-) create mode 100644 pkg/morph/client/multi.go diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index 1e253f89..dd68d2cb 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -30,22 +30,22 @@ func initMorphComponents(c *cfg) { addresses[i], addresses[j] = addresses[j], addresses[i] }) - for i := range addresses { - cli, err := client.New(c.key, addresses[i], client.WithDialTimeout(dialTimeout)) - if err == nil { - c.log.Info("neo RPC connection established", - zap.String("endpoint", addresses[i])) + cli, err := client.New(c.key, addresses[0], + client.WithDialTimeout(dialTimeout), + client.WithLogger(c.log), + client.WithExtraEndpoints(addresses[1:]), + ) + if err == nil { + handler(cli) - handler(cli) - - break - } - - c.log.Info("failed to establish neo RPC connection, trying another", - zap.String("endpoint", addresses[i]), - zap.String("error", err.Error())) + return } + c.log.Info("failed to create neo RPC client", + zap.Any("endpoints", addresses), + zap.String("error", err.Error()), + ) + fatalOnErr(err) } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index b109358c..03fb0deb 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -866,13 +866,20 @@ func createListener(ctx context.Context, p *chainParams) (event.Listener, error) } func createClient(ctx context.Context, p *chainParams) (*client.Client, error) { + // config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients" + endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.client") + if len(endpoints) == 0 { + return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) + } + return client.New( p.key, - p.cfg.GetString(p.name+".endpoint.client"), + endpoints[0], client.WithContext(ctx), client.WithLogger(p.log), client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")), client.WithSigner(p.sgn), + client.WithExtraEndpoints(endpoints[1:]), ) } diff --git a/pkg/innerring/processors/governance/processor.go b/pkg/innerring/processors/governance/processor.go index a7d2ede6..2402f7fe 100644 --- a/pkg/innerring/processors/governance/processor.go +++ b/pkg/innerring/processors/governance/processor.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client" neofscontract "github.com/nspcc-dev/neofs-node/pkg/morph/client/neofs/wrapper" nmWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" @@ -59,6 +60,8 @@ type ( morphClient *client.Client notaryDisabled bool + + designate util.Uint160 } // Params of the processor constructor. @@ -103,6 +106,12 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err) } + // result is cached by neo-go, so we can pre-calc it + designate, err := p.MainnetClient.GetDesignateHash() + if err != nil { + return nil, fmt.Errorf("could not get designate hash: %w", err) + } + return &Processor{ log: p.Log, pool: pool, @@ -115,13 +124,14 @@ func New(p *Params) (*Processor, error) { mainnetClient: p.MainnetClient, morphClient: p.MorphClient, notaryDisabled: p.NotaryDisabled, + designate: designate, }, nil } // ListenerParsers for the 'event.Listener' event producer. func (gp *Processor) ListenerParsers() []event.ParserInfo { var pi event.ParserInfo - pi.SetScriptHash(gp.mainnetClient.GetDesignateHash()) + pi.SetScriptHash(gp.designate) pi.SetType(event.TypeFromString(native.DesignationEventName)) pi.SetParser(rolemanagement.ParseDesignate) return []event.ParserInfo{pi} @@ -130,7 +140,7 @@ func (gp *Processor) ListenerParsers() []event.ParserInfo { // ListenerHandlers for the 'event.Listener' event producer. func (gp *Processor) ListenerHandlers() []event.HandlerInfo { var hi event.HandlerInfo - hi.SetScriptHash(gp.mainnetClient.GetDesignateHash()) + hi.SetScriptHash(gp.designate) hi.SetType(event.TypeFromString(native.DesignationEventName)) hi.SetHandler(gp.HandleAlphabetSync) return []event.HandlerInfo{hi} diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 73915c1f..f1f04f00 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/nspcc-dev/neo-go/pkg/core/native/nativenames" "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -21,14 +22,25 @@ import ( "go.uber.org/zap" ) -// Client is a neo-go wrapper that provides -// smart-contract invocation interface. +// Client is a wrapper over multiple neo-go clients +// that provides smart-contract invocation interface. +// +// Each operation accesses all nodes in turn until the first success, +// and returns the error of the very first client on failure. // // Working client must be created via constructor New. // Using the Client that has been created with new(Client) // expression (or just declaring a Client variable) is unsafe // and can lead to panic. type Client struct { + // two mutual exclusive modes, exactly one must be non-nil + + *singleClient // works with single neo-go client + + *multiClient // creates and caches single clients +} + +type singleClient struct { logger *logger.Logger // logging component client *client.Client // neo-go client @@ -37,8 +49,6 @@ type Client struct { gas util.Uint160 // native gas script-hash - designate util.Uint160 // native designate script-hash - waitInterval time.Duration signer *transaction.Signer @@ -72,6 +82,12 @@ var errScriptDecode = errors.New("could not decode invocation script from neo no // Invoke invokes contract method by sending transaction into blockchain. // Supported args types: int64, string, util.Uint160, []byte and bool. func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...interface{}) error { + if c.multiClient != nil { + return c.multiClient.iterateClients(func(c *Client) error { + return c.Invoke(contract, fee, method, args...) + }) + } + params := make([]sc.Parameter, 0, len(args)) for i := range args { @@ -130,7 +146,14 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, // TestInvoke invokes contract method locally in neo-go node. This method should // be used to read data from smart-contract. -func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interface{}) ([]stackitem.Item, error) { +func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interface{}) (res []stackitem.Item, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.TestInvoke(contract, method, args...) + return err + }) + } + var params = make([]sc.Parameter, 0, len(args)) for i := range args { @@ -163,6 +186,12 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interf // TransferGas to the receiver from local wallet func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error { + if c.multiClient != nil { + return c.multiClient.iterateClients(func(c *Client) error { + return c.TransferGas(receiver, amount) + }) + } + txHash, err := c.client.TransferNEP17(c.acc, receiver, c.gas, int64(amount), 0, nil, nil) if err != nil { return err @@ -177,7 +206,15 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error // Wait function blocks routing execution until there // are `n` new blocks in the chain. -func (c *Client) Wait(ctx context.Context, n uint32) { +// +// Returns only connection errors. +func (c *Client) Wait(ctx context.Context, n uint32) error { + if c.multiClient != nil { + return c.multiClient.iterateClients(func(c *Client) error { + return c.Wait(ctx, n) + }) + } + var ( err error height, newHeight uint32 @@ -187,13 +224,13 @@ func (c *Client) Wait(ctx context.Context, n uint32) { if err != nil { c.logger.Error("can't get blockchain height", zap.String("error", err.Error())) - return + return nil } for { select { case <-ctx.Done(): - return + return nil default: } @@ -201,11 +238,11 @@ func (c *Client) Wait(ctx context.Context, n uint32) { if err != nil { c.logger.Error("can't get blockchain height", zap.String("error", err.Error())) - return + return nil } if newHeight >= height+n { - return + return nil } time.Sleep(c.waitInterval) @@ -213,17 +250,38 @@ func (c *Client) Wait(ctx context.Context, n uint32) { } // GasBalance returns GAS amount in the client's wallet. -func (c *Client) GasBalance() (int64, error) { +func (c *Client) GasBalance() (res int64, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.GasBalance() + return err + }) + } + return c.client.NEP17BalanceOf(c.gas, c.acc.PrivateKey().GetScriptHash()) } // Committee returns keys of chain committee from neo native contract. -func (c *Client) Committee() (keys.PublicKeys, error) { +func (c *Client) Committee() (res keys.PublicKeys, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.Committee() + return err + }) + } + return c.client.GetCommittee() } // TxHalt returns true if transaction has been successfully executed and persisted. -func (c *Client) TxHalt(h util.Uint256) (bool, error) { +func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.TxHalt(h) + return err + }) + } + trig := trigger.Application aer, err := c.client.GetApplicationLog(h, &trig) if err != nil { @@ -235,7 +293,14 @@ func (c *Client) TxHalt(h util.Uint256) (bool, error) { // NeoFSAlphabetList returns keys that stored in NeoFS Alphabet role. Main chain // stores alphabet node keys of inner ring there, however side chain stores both // alphabet and non alphabet node keys of inner ring. -func (c *Client) NeoFSAlphabetList() (keys.PublicKeys, error) { +func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.NeoFSAlphabetList() + return err + }) + } + list, err := c.roleList(noderoles.NeoFSAlphabet) if err != nil { return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err) @@ -245,8 +310,15 @@ func (c *Client) NeoFSAlphabetList() (keys.PublicKeys, error) { } // GetDesignateHash returns hash of the native `RoleManagement` contract. -func (c *Client) GetDesignateHash() util.Uint160 { - return c.designate +func (c *Client) GetDesignateHash() (res util.Uint160, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.GetDesignateHash() + return err + }) + } + + return c.client.GetNativeContractHash(nativenames.Designation) } func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) { @@ -316,12 +388,35 @@ func toStackParameter(value interface{}) (sc.Parameter, error) { // MagicNumber returns the magic number of the network // to which the underlying RPC node client is connected. -func (c *Client) MagicNumber() uint64 { +// +// Returns 0 in case of connection problems. +func (c *Client) MagicNumber() (res uint64) { + if c.multiClient != nil { + err := c.multiClient.iterateClients(func(c *Client) error { + res = c.MagicNumber() + return nil + }) + if err != nil { + c.logger.Debug("iterate over client failure", + zap.String("error", err.Error()), + ) + } + + return + } + return uint64(c.client.GetNetwork()) } // BlockCount returns block count of the network // to which the underlying RPC node client is connected. -func (c *Client) BlockCount() (uint32, error) { +func (c *Client) BlockCount() (res uint32, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.BlockCount() + return err + }) + } + return c.client.GetBlockCount() } diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 4a8583d4..e0b90966 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -4,10 +4,8 @@ import ( "context" "time" - "github.com/nspcc-dev/neo-go/pkg/core/native/nativenames" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -30,6 +28,8 @@ type cfg struct { waitInterval time.Duration signer *transaction.Signer + + extraEndpoints []string } const ( @@ -69,8 +69,6 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) panic("empty private key") } - account := wallet.NewAccountFromPrivateKey(key) - // build default configuration cfg := defaultConfig() @@ -79,39 +77,16 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) opt(cfg) } - cli, err := client.New(cfg.ctx, endpoint, client.Options{ - DialTimeout: cfg.dialTimeout, - }) - if err != nil { - return nil, err - } + endpoints := append(cfg.extraEndpoints, endpoint) - err = cli.Init() // magic number is set there based on RPC node answer - if err != nil { - return nil, err - } - - gas, err := cli.GetNativeContractHash(nativenames.Gas) - if err != nil { - return nil, err - } - - designate, err := cli.GetNativeContractHash(nativenames.Designation) - if err != nil { - return nil, err - } - - c := &Client{ - logger: cfg.logger, - client: cli, - acc: account, - gas: gas, - designate: designate, - waitInterval: cfg.waitInterval, - signer: cfg.signer, - } - - return c, nil + return &Client{ + multiClient: &multiClient{ + cfg: *cfg, + account: wallet.NewAccountFromPrivateKey(key), + endpoints: endpoints, + clients: make(map[string]*Client, len(endpoints)), + }, + }, nil } // WithContext returns a client constructor option that @@ -169,3 +144,11 @@ func WithSigner(signer *transaction.Signer) Option { } } } + +// WithExtraEndpoints returns a client constructor option +// that specifies additional Neo rpc endpoints. +func WithExtraEndpoints(endpoints []string) Option { + return func(c *cfg) { + c.extraEndpoints = append(c.extraEndpoints, endpoints...) + } +} diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go new file mode 100644 index 00000000..d38095c6 --- /dev/null +++ b/pkg/morph/client/multi.go @@ -0,0 +1,105 @@ +package client + +import ( + "sync" + + "github.com/nspcc-dev/neo-go/pkg/core/native/nativenames" + "github.com/nspcc-dev/neo-go/pkg/rpc/client" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/zap" +) + +type multiClient struct { + cfg cfg + + account *wallet.Account + + sharedNotary *notary // notary config needed for single client construction + + endpoints []string + clientsMtx sync.Mutex + clients map[string]*Client +} + +// createForAddress creates single Client instance using provided endpoint. +// +// note: must be wrapped into mutex lock. +func (x *multiClient) createForAddress(addr string) (*Client, error) { + cli, err := client.New(x.cfg.ctx, addr, client.Options{ + DialTimeout: x.cfg.dialTimeout, + }) + if err != nil { + return nil, err + } + + err = cli.Init() // magic number is set there based on RPC node answer + if err != nil { + return nil, err + } + + gas, err := cli.GetNativeContractHash(nativenames.Gas) + if err != nil { + return nil, err + } + + c := &Client{ + singleClient: &singleClient{ + logger: x.cfg.logger, + client: cli, + acc: x.account, + gas: gas, + waitInterval: x.cfg.waitInterval, + signer: x.cfg.signer, + notary: x.sharedNotary, + }, + } + + x.clients[addr] = c + + return c, nil +} + +func (x *multiClient) iterateClients(f func(*Client) error) error { + var ( + firstErr error + err error + ) + + for i := range x.endpoints { + select { + case <-x.cfg.ctx.Done(): + return x.cfg.ctx.Err() + default: + } + + x.clientsMtx.Lock() + + c, cached := x.clients[x.endpoints[i]] + if !cached { + c, err = x.createForAddress(x.endpoints[i]) + } + + x.clientsMtx.Unlock() + + if !cached && err != nil { + x.cfg.logger.Error("could not open morph client connection", + zap.String("endpoint", x.endpoints[i]), + zap.String("err", err.Error()), + ) + } else { + err = f(c) + } + + success := err == nil + + if success || firstErr == nil { + firstErr = err + + if success { + break + } + } + } + + return firstErr +} diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index db89d6c6..8d01fa41 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -75,16 +75,25 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error { opt(cfg) } - notaryContract, err := c.client.GetNativeContractHash(nativenames.Notary) - if err != nil { - return fmt.Errorf("can't get notary contract script hash: %w", err) - } - if cfg.proxy.Equals(util.Uint160{}) { return errors.New("proxy contract hash is missing") } - c.notary = ¬ary{ + var ( + notaryContract util.Uint160 + err error + ) + + if err = c.iterateClients(func(c *Client) error { + notaryContract, err = c.client.GetNativeContractHash(nativenames.Notary) + return err + }); err != nil { + return fmt.Errorf("can't get notary contract script hash: %w", err) + } + + c.clientsMtx.Lock() + + c.sharedNotary = ¬ary{ notary: notaryContract, proxy: cfg.proxy, txValidTime: cfg.txValidTime, @@ -93,17 +102,27 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error { alphabetSource: cfg.alphabetSource, } + // update client cache + for _, cached := range c.clients { + cached.notary = c.sharedNotary + } + + c.clientsMtx.Unlock() + return nil } -// NotaryEnabled returns true if notary support was enabled in this instance -// of client by providing notary options on client creation. Otherwise returns false. -func (c *Client) NotaryEnabled() bool { - return c.notary != nil -} - // ProbeNotary checks if native `Notary` contract is presented on chain. -func (c *Client) ProbeNotary() bool { +func (c *Client) ProbeNotary() (res bool) { + if c.multiClient != nil { + _ = c.multiClient.iterateClients(func(c *Client) error { + res = c.ProbeNotary() + return nil + }) + + return + } + _, err := c.client.GetNativeContractHash(nativenames.Notary) return err == nil } @@ -115,7 +134,14 @@ func (c *Client) ProbeNotary() bool { // use this function. // // This function must be invoked with notary enabled otherwise it throws panic. -func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256, error) { +func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uint256, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.DepositNotary(amount, delta) + return err + }) + } + if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -150,7 +176,14 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256 // Notary support should be enabled in client to use this function. // // This function must be invoked with notary enabled otherwise it throws panic. -func (c *Client) GetNotaryDeposit() (int64, error) { +func (c *Client) GetNotaryDeposit() (res int64, err error) { + if c.multiClient != nil { + return res, c.multiClient.iterateClients(func(c *Client) error { + res, err = c.GetNotaryDeposit() + return err + }) + } + if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -179,11 +212,17 @@ func (c *Client) GetNotaryDeposit() (int64, error) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNotaryList(list keys.PublicKeys) error { + if c.multiClient != nil { + return c.multiClient.iterateClients(func(c *Client) error { + return c.UpdateNotaryList(list) + }) + } + if c.notary == nil { panic(notaryNotEnabledPanicMsg) } - return c.notaryInvokeAsCommittee(c.designate, + return c.notaryInvokeAsCommittee( setDesignateMethod, noderoles.P2PNotary, list, @@ -196,11 +235,17 @@ func (c *Client) UpdateNotaryList(list keys.PublicKeys) error { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNeoFSAlphabetList(list keys.PublicKeys) error { + if c.multiClient != nil { + return c.multiClient.iterateClients(func(c *Client) error { + return c.UpdateNeoFSAlphabetList(list) + }) + } + if c.notary == nil { panic(notaryNotEnabledPanicMsg) } - return c.notaryInvokeAsCommittee(c.designate, + return c.notaryInvokeAsCommittee( setDesignateMethod, noderoles.NeoFSAlphabet, list, @@ -213,6 +258,12 @@ func (c *Client) UpdateNeoFSAlphabetList(list keys.PublicKeys) error { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...interface{}) error { + if c.multiClient != nil { + return c.multiClient.iterateClients(func(c *Client) error { + return c.NotaryInvoke(contract, fee, method, args...) + }) + } + if c.notary == nil { return c.Invoke(contract, fee, method, args...) } @@ -220,8 +271,13 @@ func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, method s return c.notaryInvoke(false, contract, method, args...) } -func (c *Client) notaryInvokeAsCommittee(contract util.Uint160, method string, args ...interface{}) error { - return c.notaryInvoke(true, contract, method, args...) +func (c *Client) notaryInvokeAsCommittee(method string, args ...interface{}) error { + designate, err := c.GetDesignateHash() + if err != nil { + return err + } + + return c.notaryInvoke(true, designate, method, args...) } func (c *Client) notaryInvoke(committee bool, contract util.Uint160, method string, args ...interface{}) error {