[#746] morph: Implement and use multi-client

There is a need to work with a set of Neo RPC nodes in order not to depend
on the failure of some nodes while others are active.

Support "multi-client" mode of morph `Client` entity. If instance is not
"multi-client", it works as before. Constructor `New` creates multi-client,
and each method performs iterating over the fixed set of endpoints until
success. Opened client connections are cached (without eviction for now).

Storage (as earlier) and IR (from now) nodes can be configured with multiple
Neo endpoints. As above, `New` creates multi-client instance, so we don't
need initialization changes on app-side.

`Wait` and `GetDesignateHash` methods of `Client` return an error from now
to detect connection errors. `NotaryEnabled` method is removed as unused.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-08-26 10:59:02 +03:00 committed by Alex Vanin
parent f9218bf84f
commit ad7ad12a0c
7 changed files with 345 additions and 89 deletions

View file

@ -30,22 +30,22 @@ func initMorphComponents(c *cfg) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
for i := range addresses {
cli, err := client.New(c.key, addresses[i], client.WithDialTimeout(dialTimeout))
if err == nil {
c.log.Info("neo RPC connection established",
zap.String("endpoint", addresses[i]))
cli, err := client.New(c.key, addresses[0],
client.WithDialTimeout(dialTimeout),
client.WithLogger(c.log),
client.WithExtraEndpoints(addresses[1:]),
)
if err == nil {
handler(cli)
handler(cli)
break
}
c.log.Info("failed to establish neo RPC connection, trying another",
zap.String("endpoint", addresses[i]),
zap.String("error", err.Error()))
return
}
c.log.Info("failed to create neo RPC client",
zap.Any("endpoints", addresses),
zap.String("error", err.Error()),
)
fatalOnErr(err)
}

View file

@ -866,13 +866,20 @@ func createListener(ctx context.Context, p *chainParams) (event.Listener, error)
}
func createClient(ctx context.Context, p *chainParams) (*client.Client, error) {
// config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients"
endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.client")
if len(endpoints) == 0 {
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
}
return client.New(
p.key,
p.cfg.GetString(p.name+".endpoint.client"),
endpoints[0],
client.WithContext(ctx),
client.WithLogger(p.log),
client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")),
client.WithSigner(p.sgn),
client.WithExtraEndpoints(endpoints[1:]),
)
}

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
neofscontract "github.com/nspcc-dev/neofs-node/pkg/morph/client/neofs/wrapper"
nmWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
@ -59,6 +60,8 @@ type (
morphClient *client.Client
notaryDisabled bool
designate util.Uint160
}
// Params of the processor constructor.
@ -103,6 +106,12 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
}
// result is cached by neo-go, so we can pre-calc it
designate, err := p.MainnetClient.GetDesignateHash()
if err != nil {
return nil, fmt.Errorf("could not get designate hash: %w", err)
}
return &Processor{
log: p.Log,
pool: pool,
@ -115,13 +124,14 @@ func New(p *Params) (*Processor, error) {
mainnetClient: p.MainnetClient,
morphClient: p.MorphClient,
notaryDisabled: p.NotaryDisabled,
designate: designate,
}, nil
}
// ListenerParsers for the 'event.Listener' event producer.
func (gp *Processor) ListenerParsers() []event.ParserInfo {
var pi event.ParserInfo
pi.SetScriptHash(gp.mainnetClient.GetDesignateHash())
pi.SetScriptHash(gp.designate)
pi.SetType(event.TypeFromString(native.DesignationEventName))
pi.SetParser(rolemanagement.ParseDesignate)
return []event.ParserInfo{pi}
@ -130,7 +140,7 @@ func (gp *Processor) ListenerParsers() []event.ParserInfo {
// ListenerHandlers for the 'event.Listener' event producer.
func (gp *Processor) ListenerHandlers() []event.HandlerInfo {
var hi event.HandlerInfo
hi.SetScriptHash(gp.mainnetClient.GetDesignateHash())
hi.SetScriptHash(gp.designate)
hi.SetType(event.TypeFromString(native.DesignationEventName))
hi.SetHandler(gp.HandleAlphabetSync)
return []event.HandlerInfo{hi}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -21,14 +22,25 @@ import (
"go.uber.org/zap"
)
// Client is a neo-go wrapper that provides
// smart-contract invocation interface.
// Client is a wrapper over multiple neo-go clients
// that provides smart-contract invocation interface.
//
// Each operation accesses all nodes in turn until the first success,
// and returns the error of the very first client on failure.
//
// Working client must be created via constructor New.
// Using the Client that has been created with new(Client)
// expression (or just declaring a Client variable) is unsafe
// and can lead to panic.
type Client struct {
// two mutual exclusive modes, exactly one must be non-nil
*singleClient // works with single neo-go client
*multiClient // creates and caches single clients
}
type singleClient struct {
logger *logger.Logger // logging component
client *client.Client // neo-go client
@ -37,8 +49,6 @@ type Client struct {
gas util.Uint160 // native gas script-hash
designate util.Uint160 // native designate script-hash
waitInterval time.Duration
signer *transaction.Signer
@ -72,6 +82,12 @@ var errScriptDecode = errors.New("could not decode invocation script from neo no
// Invoke invokes contract method by sending transaction into blockchain.
// Supported args types: int64, string, util.Uint160, []byte and bool.
func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...interface{}) error {
if c.multiClient != nil {
return c.multiClient.iterateClients(func(c *Client) error {
return c.Invoke(contract, fee, method, args...)
})
}
params := make([]sc.Parameter, 0, len(args))
for i := range args {
@ -130,7 +146,14 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string,
// TestInvoke invokes contract method locally in neo-go node. This method should
// be used to read data from smart-contract.
func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interface{}) ([]stackitem.Item, error) {
func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interface{}) (res []stackitem.Item, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.TestInvoke(contract, method, args...)
return err
})
}
var params = make([]sc.Parameter, 0, len(args))
for i := range args {
@ -163,6 +186,12 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interf
// TransferGas to the receiver from local wallet
func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error {
if c.multiClient != nil {
return c.multiClient.iterateClients(func(c *Client) error {
return c.TransferGas(receiver, amount)
})
}
txHash, err := c.client.TransferNEP17(c.acc, receiver, c.gas, int64(amount), 0, nil, nil)
if err != nil {
return err
@ -177,7 +206,15 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error
// Wait function blocks routing execution until there
// are `n` new blocks in the chain.
func (c *Client) Wait(ctx context.Context, n uint32) {
//
// Returns only connection errors.
func (c *Client) Wait(ctx context.Context, n uint32) error {
if c.multiClient != nil {
return c.multiClient.iterateClients(func(c *Client) error {
return c.Wait(ctx, n)
})
}
var (
err error
height, newHeight uint32
@ -187,13 +224,13 @@ func (c *Client) Wait(ctx context.Context, n uint32) {
if err != nil {
c.logger.Error("can't get blockchain height",
zap.String("error", err.Error()))
return
return nil
}
for {
select {
case <-ctx.Done():
return
return nil
default:
}
@ -201,11 +238,11 @@ func (c *Client) Wait(ctx context.Context, n uint32) {
if err != nil {
c.logger.Error("can't get blockchain height",
zap.String("error", err.Error()))
return
return nil
}
if newHeight >= height+n {
return
return nil
}
time.Sleep(c.waitInterval)
@ -213,17 +250,38 @@ func (c *Client) Wait(ctx context.Context, n uint32) {
}
// GasBalance returns GAS amount in the client's wallet.
func (c *Client) GasBalance() (int64, error) {
func (c *Client) GasBalance() (res int64, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.GasBalance()
return err
})
}
return c.client.NEP17BalanceOf(c.gas, c.acc.PrivateKey().GetScriptHash())
}
// Committee returns keys of chain committee from neo native contract.
func (c *Client) Committee() (keys.PublicKeys, error) {
func (c *Client) Committee() (res keys.PublicKeys, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.Committee()
return err
})
}
return c.client.GetCommittee()
}
// TxHalt returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHalt(h util.Uint256) (bool, error) {
func (c *Client) TxHalt(h util.Uint256) (res bool, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.TxHalt(h)
return err
})
}
trig := trigger.Application
aer, err := c.client.GetApplicationLog(h, &trig)
if err != nil {
@ -235,7 +293,14 @@ func (c *Client) TxHalt(h util.Uint256) (bool, error) {
// NeoFSAlphabetList returns keys that stored in NeoFS Alphabet role. Main chain
// stores alphabet node keys of inner ring there, however side chain stores both
// alphabet and non alphabet node keys of inner ring.
func (c *Client) NeoFSAlphabetList() (keys.PublicKeys, error) {
func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.NeoFSAlphabetList()
return err
})
}
list, err := c.roleList(noderoles.NeoFSAlphabet)
if err != nil {
return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err)
@ -245,8 +310,15 @@ func (c *Client) NeoFSAlphabetList() (keys.PublicKeys, error) {
}
// GetDesignateHash returns hash of the native `RoleManagement` contract.
func (c *Client) GetDesignateHash() util.Uint160 {
return c.designate
func (c *Client) GetDesignateHash() (res util.Uint160, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.GetDesignateHash()
return err
})
}
return c.client.GetNativeContractHash(nativenames.Designation)
}
func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
@ -316,12 +388,35 @@ func toStackParameter(value interface{}) (sc.Parameter, error) {
// MagicNumber returns the magic number of the network
// to which the underlying RPC node client is connected.
func (c *Client) MagicNumber() uint64 {
//
// Returns 0 in case of connection problems.
func (c *Client) MagicNumber() (res uint64) {
if c.multiClient != nil {
err := c.multiClient.iterateClients(func(c *Client) error {
res = c.MagicNumber()
return nil
})
if err != nil {
c.logger.Debug("iterate over client failure",
zap.String("error", err.Error()),
)
}
return
}
return uint64(c.client.GetNetwork())
}
// BlockCount returns block count of the network
// to which the underlying RPC node client is connected.
func (c *Client) BlockCount() (uint32, error) {
func (c *Client) BlockCount() (res uint32, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.BlockCount()
return err
})
}
return c.client.GetBlockCount()
}

View file

@ -4,10 +4,8 @@ import (
"context"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
@ -30,6 +28,8 @@ type cfg struct {
waitInterval time.Duration
signer *transaction.Signer
extraEndpoints []string
}
const (
@ -69,8 +69,6 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
panic("empty private key")
}
account := wallet.NewAccountFromPrivateKey(key)
// build default configuration
cfg := defaultConfig()
@ -79,39 +77,16 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error)
opt(cfg)
}
cli, err := client.New(cfg.ctx, endpoint, client.Options{
DialTimeout: cfg.dialTimeout,
})
if err != nil {
return nil, err
}
endpoints := append(cfg.extraEndpoints, endpoint)
err = cli.Init() // magic number is set there based on RPC node answer
if err != nil {
return nil, err
}
gas, err := cli.GetNativeContractHash(nativenames.Gas)
if err != nil {
return nil, err
}
designate, err := cli.GetNativeContractHash(nativenames.Designation)
if err != nil {
return nil, err
}
c := &Client{
logger: cfg.logger,
client: cli,
acc: account,
gas: gas,
designate: designate,
waitInterval: cfg.waitInterval,
signer: cfg.signer,
}
return c, nil
return &Client{
multiClient: &multiClient{
cfg: *cfg,
account: wallet.NewAccountFromPrivateKey(key),
endpoints: endpoints,
clients: make(map[string]*Client, len(endpoints)),
},
}, nil
}
// WithContext returns a client constructor option that
@ -169,3 +144,11 @@ func WithSigner(signer *transaction.Signer) Option {
}
}
}
// WithExtraEndpoints returns a client constructor option
// that specifies additional Neo rpc endpoints.
func WithExtraEndpoints(endpoints []string) Option {
return func(c *cfg) {
c.extraEndpoints = append(c.extraEndpoints, endpoints...)
}
}

105
pkg/morph/client/multi.go Normal file
View file

@ -0,0 +1,105 @@
package client
import (
"sync"
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
)
type multiClient struct {
cfg cfg
account *wallet.Account
sharedNotary *notary // notary config needed for single client construction
endpoints []string
clientsMtx sync.Mutex
clients map[string]*Client
}
// createForAddress creates single Client instance using provided endpoint.
//
// note: must be wrapped into mutex lock.
func (x *multiClient) createForAddress(addr string) (*Client, error) {
cli, err := client.New(x.cfg.ctx, addr, client.Options{
DialTimeout: x.cfg.dialTimeout,
})
if err != nil {
return nil, err
}
err = cli.Init() // magic number is set there based on RPC node answer
if err != nil {
return nil, err
}
gas, err := cli.GetNativeContractHash(nativenames.Gas)
if err != nil {
return nil, err
}
c := &Client{
singleClient: &singleClient{
logger: x.cfg.logger,
client: cli,
acc: x.account,
gas: gas,
waitInterval: x.cfg.waitInterval,
signer: x.cfg.signer,
notary: x.sharedNotary,
},
}
x.clients[addr] = c
return c, nil
}
func (x *multiClient) iterateClients(f func(*Client) error) error {
var (
firstErr error
err error
)
for i := range x.endpoints {
select {
case <-x.cfg.ctx.Done():
return x.cfg.ctx.Err()
default:
}
x.clientsMtx.Lock()
c, cached := x.clients[x.endpoints[i]]
if !cached {
c, err = x.createForAddress(x.endpoints[i])
}
x.clientsMtx.Unlock()
if !cached && err != nil {
x.cfg.logger.Error("could not open morph client connection",
zap.String("endpoint", x.endpoints[i]),
zap.String("err", err.Error()),
)
} else {
err = f(c)
}
success := err == nil
if success || firstErr == nil {
firstErr = err
if success {
break
}
}
}
return firstErr
}

View file

@ -75,16 +75,25 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error {
opt(cfg)
}
notaryContract, err := c.client.GetNativeContractHash(nativenames.Notary)
if err != nil {
return fmt.Errorf("can't get notary contract script hash: %w", err)
}
if cfg.proxy.Equals(util.Uint160{}) {
return errors.New("proxy contract hash is missing")
}
c.notary = &notary{
var (
notaryContract util.Uint160
err error
)
if err = c.iterateClients(func(c *Client) error {
notaryContract, err = c.client.GetNativeContractHash(nativenames.Notary)
return err
}); err != nil {
return fmt.Errorf("can't get notary contract script hash: %w", err)
}
c.clientsMtx.Lock()
c.sharedNotary = &notary{
notary: notaryContract,
proxy: cfg.proxy,
txValidTime: cfg.txValidTime,
@ -93,17 +102,27 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error {
alphabetSource: cfg.alphabetSource,
}
// update client cache
for _, cached := range c.clients {
cached.notary = c.sharedNotary
}
c.clientsMtx.Unlock()
return nil
}
// NotaryEnabled returns true if notary support was enabled in this instance
// of client by providing notary options on client creation. Otherwise returns false.
func (c *Client) NotaryEnabled() bool {
return c.notary != nil
}
// ProbeNotary checks if native `Notary` contract is presented on chain.
func (c *Client) ProbeNotary() bool {
func (c *Client) ProbeNotary() (res bool) {
if c.multiClient != nil {
_ = c.multiClient.iterateClients(func(c *Client) error {
res = c.ProbeNotary()
return nil
})
return
}
_, err := c.client.GetNativeContractHash(nativenames.Notary)
return err == nil
}
@ -115,7 +134,14 @@ func (c *Client) ProbeNotary() bool {
// use this function.
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256, error) {
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uint256, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.DepositNotary(amount, delta)
return err
})
}
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
@ -150,7 +176,14 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256
// Notary support should be enabled in client to use this function.
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) GetNotaryDeposit() (int64, error) {
func (c *Client) GetNotaryDeposit() (res int64, err error) {
if c.multiClient != nil {
return res, c.multiClient.iterateClients(func(c *Client) error {
res, err = c.GetNotaryDeposit()
return err
})
}
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
@ -179,11 +212,17 @@ func (c *Client) GetNotaryDeposit() (int64, error) {
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) UpdateNotaryList(list keys.PublicKeys) error {
if c.multiClient != nil {
return c.multiClient.iterateClients(func(c *Client) error {
return c.UpdateNotaryList(list)
})
}
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
return c.notaryInvokeAsCommittee(c.designate,
return c.notaryInvokeAsCommittee(
setDesignateMethod,
noderoles.P2PNotary,
list,
@ -196,11 +235,17 @@ func (c *Client) UpdateNotaryList(list keys.PublicKeys) error {
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) UpdateNeoFSAlphabetList(list keys.PublicKeys) error {
if c.multiClient != nil {
return c.multiClient.iterateClients(func(c *Client) error {
return c.UpdateNeoFSAlphabetList(list)
})
}
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
return c.notaryInvokeAsCommittee(c.designate,
return c.notaryInvokeAsCommittee(
setDesignateMethod,
noderoles.NeoFSAlphabet,
list,
@ -213,6 +258,12 @@ func (c *Client) UpdateNeoFSAlphabetList(list keys.PublicKeys) error {
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...interface{}) error {
if c.multiClient != nil {
return c.multiClient.iterateClients(func(c *Client) error {
return c.NotaryInvoke(contract, fee, method, args...)
})
}
if c.notary == nil {
return c.Invoke(contract, fee, method, args...)
}
@ -220,8 +271,13 @@ func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, method s
return c.notaryInvoke(false, contract, method, args...)
}
func (c *Client) notaryInvokeAsCommittee(contract util.Uint160, method string, args ...interface{}) error {
return c.notaryInvoke(true, contract, method, args...)
func (c *Client) notaryInvokeAsCommittee(method string, args ...interface{}) error {
designate, err := c.GetDesignateHash()
if err != nil {
return err
}
return c.notaryInvoke(true, designate, method, args...)
}
func (c *Client) notaryInvoke(committee bool, contract util.Uint160, method string, args ...interface{}) error {