From 402f488bec78b81199dcdafb7fbf99ecd4a56535 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 9 Feb 2021 20:52:10 +0300 Subject: [PATCH] [#1170] pkg/morph: Change HTTP for WS client Updated client now supports subscription to chain notifications and RPC switch between provided RPC endpoints. Signed-off-by: Pavel Karpy --- cmd/neofs-node/morph.go | 2 +- pkg/morph/client/client.go | 236 +++++++++++++++------------ pkg/morph/client/constructor.go | 93 +++++++---- pkg/morph/client/multi.go | 226 ++++++++++++++----------- pkg/morph/client/nns.go | 37 ++--- pkg/morph/client/notary.go | 131 +++++++-------- pkg/morph/client/notifications.go | 263 ++++++++++++++++++++++++++++++ 7 files changed, 661 insertions(+), 327 deletions(-) create mode 100644 pkg/morph/client/notifications.go diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index a670ab05d..25d6e7063 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -61,7 +61,7 @@ func initMorphComponents(c *cfg) { if err := cli.SetGroupSignerScope(); err != nil { c.log.Info("failed to set group signer scope, continue with Global", zap.Error(err)) } - + c.cfgMorph.client = cli c.cfgMorph.notaryEnabled = cli.ProbeNotary() diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index ebc756f25..54fa7f6f3 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -24,11 +24,15 @@ import ( "go.uber.org/zap" ) -// Client is a wrapper over multiple neo-go clients -// that provides smart-contract invocation interface. +// Client is a wrapper over web socket neo-go client +// that provides smart-contract invocation interface +// and notification subscription functionality. // -// Each operation accesses all nodes in turn until the first success, -// and returns the error of the very first client on failure. +// On connection lost tries establishing new connection +// to the next RPC (if any). If no RPC node available, +// switches to inactive mode: any RPC call leads to immediate +// return with ErrConnectionLost error, notification channel +// returned from Client.NotificationChannel is closed. // // Working client must be created via constructor New. // Using the Client that has been created with new(Client) @@ -37,48 +41,55 @@ import ( type Client struct { cache - // two mutual exclusive modes, exactly one must be non-nil - *singleClient // works with single neo-go client - - *multiClient // creates and caches single clients -} - -type cache struct { - // mtx protects primitive values. - mtx sync.RWMutex - nnsHash util.Uint160 - groupKey *keys.PublicKey - // txHeights is a thread-safe LRU cache for transaction heights. - txHeights *lru.Cache -} - -type singleClient struct { logger *logger.Logger // logging component - client *client.Client // neo-go client + client *client.WSClient // neo-go websocket client acc *wallet.Account // neo account - waitInterval time.Duration - signer *transaction.Signer notary *notary + + cfg cfg + + endpoints *endpoints + + // switching between rpc endpoint lock + switchLock *sync.RWMutex + + // channel for ws notifications + notifications chan client.Notification + + // channel for internal stop + closeChan chan struct{} + + // cached subscription information + subscribedEvents map[util.Uint160]string + subscribedNotaryEvents map[util.Uint160]string + subscribedToNewBlocks bool + + // indicates that Client is not able to + // establish connection to any of the + // provided RPC endpoints + inactive bool } -func blankSingleClient(cli *client.Client, w *wallet.Account, cfg *cfg) *singleClient { - return &singleClient{ - logger: cfg.logger, - client: cli, - acc: w, - waitInterval: cfg.waitInterval, - signer: cfg.signer, - } +type cache struct { + nnsHash util.Uint160 + groupKey *keys.PublicKey + txHeights *lru.Cache } -// ErrNilClient is returned by functions that expect -// a non-nil Client pointer, but received nil. -var ErrNilClient = errors.New("client is nil") +var ( + // ErrNilClient is returned by functions that expect + // a non-nil Client pointer, but received nil. + ErrNilClient = errors.New("client is nil") + + // ErrConnectionLost is returned when client lost web socket connection + // to the RPC node and has not been able to establish a new one since. + ErrConnectionLost = errors.New("connection to the RPC node has been lost") +) // HaltState returned if TestInvoke function processed without panic. const HaltState = "HALT" @@ -123,10 +134,11 @@ func unwrapNeoFSError(err error) error { // Invoke invokes contract method by sending transaction into blockchain. // Supported args types: int64, string, util.Uint160, []byte and bool. func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...interface{}) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.Invoke(contract, fee, method, args...) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } params := make([]sc.Parameter, 0, len(args)) @@ -188,11 +200,11 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, // TestInvoke invokes contract method locally in neo-go node. This method should // be used to read data from smart-contract. func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interface{}) (res []stackitem.Item, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.TestInvoke(contract, method, args...) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return nil, ErrConnectionLost } var params = make([]sc.Parameter, 0, len(args)) @@ -227,10 +239,11 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...interf // TransferGas to the receiver from local wallet func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.TransferGas(receiver, amount) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } gas, err := c.client.GetNativeContractHash(nativenames.Gas) @@ -255,10 +268,11 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error // // Returns only connection errors. func (c *Client) Wait(ctx context.Context, n uint32) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.Wait(ctx, n) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } var ( @@ -291,17 +305,17 @@ func (c *Client) Wait(ctx context.Context, n uint32) error { return nil } - time.Sleep(c.waitInterval) + time.Sleep(c.cfg.waitInterval) } } // GasBalance returns GAS amount in the client's wallet. func (c *Client) GasBalance() (res int64, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.GasBalance() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, ErrConnectionLost } gas, err := c.client.GetNativeContractHash(nativenames.Gas) @@ -314,11 +328,11 @@ func (c *Client) GasBalance() (res int64, err error) { // Committee returns keys of chain committee from neo native contract. func (c *Client) Committee() (res keys.PublicKeys, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.Committee() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return nil, ErrConnectionLost } return c.client.GetCommittee() @@ -326,11 +340,11 @@ func (c *Client) Committee() (res keys.PublicKeys, err error) { // TxHalt returns true if transaction has been successfully executed and persisted. func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.TxHalt(h) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return false, ErrConnectionLost } trig := trigger.Application @@ -343,11 +357,11 @@ func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { // TxHeight returns true if transaction has been successfully executed and persisted. func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.TxHeight(h) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, ErrConnectionLost } return c.client.GetTransactionHeight(h) @@ -357,11 +371,11 @@ func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) { // stores alphabet node keys of inner ring there, however side chain stores both // alphabet and non alphabet node keys of inner ring. func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.NeoFSAlphabetList() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return nil, ErrConnectionLost } list, err := c.roleList(noderoles.NeoFSAlphabet) @@ -374,11 +388,11 @@ func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) { // GetDesignateHash returns hash of the native `RoleManagement` contract. func (c *Client) GetDesignateHash() (res util.Uint160, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.GetDesignateHash() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return util.Uint160{}, ErrConnectionLost } return c.client.GetNativeContractHash(nativenames.Designation) @@ -457,11 +471,11 @@ func toStackParameter(value interface{}) (sc.Parameter, error) { // // Returns 0 in case of connection problems. func (c *Client) MagicNumber() (res uint64, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.MagicNumber() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, ErrConnectionLost } return uint64(c.client.GetNetwork()), nil @@ -470,11 +484,11 @@ func (c *Client) MagicNumber() (res uint64, err error) { // BlockCount returns block count of the network // to which the underlying RPC node client is connected. func (c *Client) BlockCount() (res uint32, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.BlockCount() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, ErrConnectionLost } return c.client.GetBlockCount() @@ -482,11 +496,11 @@ func (c *Client) BlockCount() (res uint32, err error) { // MsPerBlock returns MillisecondsPerBlock network parameter. func (c *Client) MsPerBlock() (res int64, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.MsPerBlock() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, ErrConnectionLost } v, err := c.client.GetVersion() @@ -499,11 +513,11 @@ func (c *Client) MsPerBlock() (res int64, err error) { // IsValidScript returns true if invocation script executes with HALT state. func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res bool, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.IsValidScript(script, signers) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return false, ErrConnectionLost } result, err := c.client.InvokeScript(script, signers) @@ -513,3 +527,21 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res return result.State == vm.HaltState.String(), nil } + +// NotificationChannel returns channel than receives subscribed +// notification from the connected RPC node. +// Channel is closed when connection to the RPC node has been +// lost without the possibility of recovery. +func (c *Client) NotificationChannel() <-chan client.Notification { + return c.notifications +} + +// inactiveMode switches Client to an inactive mode: +// - notification channel is closed; +// - all the new RPC request would return ErrConnectionLost. +// +// Note: must be called only with held write switchLock. +func (c *Client) inactiveMode() { + close(c.notifications) + c.inactive = true +} diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 8d54d5ef4..cddfebe2c 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -2,12 +2,15 @@ package client import ( "context" + "fmt" + "sync" "time" lru "github.com/hashicorp/golang-lru" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/rpc/client" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" @@ -30,15 +33,12 @@ type cfg struct { extraEndpoints []string - maxConnPerHost int - - singleCli *client.Client // neo-go client for single client mode + singleCli *client.WSClient // neo-go client for single client mode } const ( - defaultDialTimeout = 5 * time.Second - defaultWaitInterval = 500 * time.Millisecond - defaultMaxConnPerHost = 10 + defaultDialTimeout = 5 * time.Second + defaultWaitInterval = 500 * time.Millisecond ) func defaultConfig() *cfg { @@ -50,7 +50,6 @@ func defaultConfig() *cfg { signer: &transaction.Signer{ Scopes: transaction.Global, }, - maxConnPerHost: defaultMaxConnPerHost, } } @@ -64,6 +63,8 @@ func defaultConfig() *cfg { // * client context: Background; // * dial timeout: 5s; // * blockchain network type: netmode.PrivNet; +// * signer with the global scope; +// * wait interval: 500ms; // * logger: zap.L(). // // If desired option satisfies the default value, it can be omitted. @@ -82,27 +83,58 @@ func New(key *keys.PrivateKey, endpoint string, opts ...Option) (*Client, error) opt(cfg) } - if cfg.singleCli != nil { - return &Client{ - cache: newClientCache(), - singleClient: blankSingleClient(cfg.singleCli, wallet.NewAccountFromPrivateKey(key), cfg), - }, nil + cli := &Client{ + cache: initClientCache(), + logger: cfg.logger, + acc: wallet.NewAccountFromPrivateKey(key), + signer: cfg.signer, + cfg: *cfg, + switchLock: &sync.RWMutex{}, + notifications: make(chan client.Notification), + subscribedEvents: make(map[util.Uint160]string), + subscribedNotaryEvents: make(map[util.Uint160]string), + closeChan: make(chan struct{}), } - endpoints := append(cfg.extraEndpoints, endpoint) + if cfg.singleCli != nil { + // return client in single RPC node mode that uses + // predefined WS client + // + // in case of the closing web socket connection: + // if extra endpoints were provided via options, + // they will be used in switch process, otherwise + // inactive mode will be enabled + cli.client = cfg.singleCli + cli.endpoints = newEndpoints(cfg.extraEndpoints) + } else { + ws, err := newWSClient(*cfg, endpoint) + if err != nil { + return nil, fmt.Errorf("could not create morph client: %w", err) + } - return &Client{ - cache: newClientCache(), - multiClient: &multiClient{ - cfg: *cfg, - account: wallet.NewAccountFromPrivateKey(key), - endpoints: endpoints, - clients: make(map[string]*Client, len(endpoints)), - }, - }, nil + err = ws.Init() + if err != nil { + return nil, fmt.Errorf("could not init morph client: %w", err) + } + + cli.client = ws + cli.endpoints = newEndpoints(append([]string{endpoint}, cfg.extraEndpoints...)) + } + + go cli.notificationLoop() + + return cli, nil } -func newClientCache() cache { +func newWSClient(cfg cfg, endpoint string) (*client.WSClient, error) { + return client.NewWS( + cfg.ctx, + endpoint, + client.Options{DialTimeout: cfg.dialTimeout}, + ) +} + +func initClientCache() cache { c, _ := lru.New(100) // returns error only if size is negative return cache{ txHeights: c, @@ -169,29 +201,18 @@ func WithSigner(signer *transaction.Signer) Option { // WithExtraEndpoints returns a client constructor option // that specifies additional Neo rpc endpoints. -// -// Has no effect if WithSingleClient is provided. func WithExtraEndpoints(endpoints []string) Option { return func(c *cfg) { c.extraEndpoints = append(c.extraEndpoints, endpoints...) } } -// WithMaxConnectionPerHost returns a client constructor -// option that specifies Neo client's maximum opened -// connection per one host. -func WithMaxConnectionPerHost(m int) Option { - return func(c *cfg) { - c.maxConnPerHost = m - } -} - // WithSingleClient returns a client constructor option // that specifies single neo-go client and forces Client -// to use it and only it for requests. +// to use it for requests. // // Passed client must already be initialized. -func WithSingleClient(cli *client.Client) Option { +func WithSingleClient(cli *client.WSClient) Option { return func(c *cfg) { c.singleCli = cli } diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 9ec7b4593..00620d2d3 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -1,117 +1,157 @@ package client import ( - "sync" - - "github.com/nspcc-dev/neo-go/pkg/rpc/client" - "github.com/nspcc-dev/neo-go/pkg/wallet" "go.uber.org/zap" ) -type multiClient struct { - cfg cfg - - account *wallet.Account - - sharedNotary *notary // notary config needed for single client construction - - endpoints []string - clientsMtx sync.RWMutex - // lastSuccess is an index in endpoints array relating to a last - // used endpoint. - lastSuccess int - clients map[string]*Client +type endpoints struct { + curr int + list []string } -// createForAddress creates single Client instance using provided endpoint. -func (x *multiClient) createForAddress(addr string) (*Client, error) { - cli, err := client.New(x.cfg.ctx, addr, client.Options{ - DialTimeout: x.cfg.dialTimeout, - MaxConnsPerHost: x.cfg.maxConnPerHost, - }) - if err != nil { - return nil, err +func newEndpoints(ee []string) *endpoints { + return &endpoints{ + curr: 0, + list: ee, } - - err = cli.Init() // magic number is set there based on RPC node answer - if err != nil { - return nil, err - } - - var c *Client - - x.clientsMtx.Lock() - // While creating 2 clients in parallel is ok, we don't want to - // use a client missing from `x.clients` map as it can lead - // to unexpected bugs. - if x.clients[addr] == nil { - sCli := blankSingleClient(cli, x.account, &x.cfg) - sCli.notary = x.sharedNotary - - c = &Client{ - cache: newClientCache(), - singleClient: sCli, - } - x.clients[addr] = c - } else { - c = x.clients[addr] - } - x.clientsMtx.Unlock() - - return c, nil } -// iterateClients executes f on each client until nil error is returned. -// When nil error is returned, lastSuccess field is updated. -// The iteration order is non-deterministic and shouldn't be relied upon. -func (x *multiClient) iterateClients(f func(*Client) error) error { - var ( - firstErr error - err error - ) +// next returns the next endpoint and its index +// to try to connect to. +// Returns -1 index if there is no known RPC endpoints. +func (e *endpoints) next() (string, int) { + if len(e.list) == 0 { + return "", -1 + } - x.clientsMtx.RLock() - start := x.lastSuccess - x.clientsMtx.RUnlock() + next := e.curr + 1 + if next == len(e.list) { + next = 0 + } - for i := 0; i < len(x.endpoints); i++ { - index := (start + i) % len(x.endpoints) + e.curr = next - x.clientsMtx.RLock() - c, cached := x.clients[x.endpoints[index]] - x.clientsMtx.RUnlock() - if !cached { - c, err = x.createForAddress(x.endpoints[index]) + return e.list[next], next +} + +// current returns an endpoint and its index the Client +// is connected to. +// Returns -1 index if there is no known RPC endpoints +func (e *endpoints) current() (string, int) { + if len(e.list) == 0 { + return "", -1 + } + + return e.list[e.curr], e.curr +} + +func (c *Client) switchRPC() bool { + c.switchLock.Lock() + defer c.switchLock.Unlock() + + c.client.Close() + + _, currEndpointIndex := c.endpoints.current() + if currEndpointIndex == -1 { + // there are no known RPC endpoints to try + // to connect to => do not switch + return false + } + + for { + newEndpoint, index := c.endpoints.next() + if index == currEndpointIndex { + // all the endpoint have been tried + // for connection unsuccessfully + return false } - if !cached && err != nil { - x.cfg.logger.Error("could not open morph client connection", - zap.String("endpoint", x.endpoints[index]), - zap.String("err", err.Error()), + cli, err := newWSClient(c.cfg, newEndpoint) + if err != nil { + c.logger.Warn("could not establish connection to the switched RPC node", + zap.String("endpoint", newEndpoint), + zap.Error(err), ) - } else { - err = f(c) + + continue } - if err == nil { - if i != 0 { - x.clientsMtx.Lock() - x.lastSuccess = index - x.clientsMtx.Unlock() + err = cli.Init() + if err != nil { + cli.Close() + c.logger.Warn("could not init the switched RPC node", + zap.String("endpoint", newEndpoint), + zap.Error(err), + ) + + continue + } + + c.client = cli + + return true + } +} + +func (c *Client) notificationLoop() { + for { + select { + case <-c.cfg.ctx.Done(): + _ = c.UnsubscribeAll() + c.close() + + return + case <-c.closeChan: + _ = c.UnsubscribeAll() + c.close() + + return + case n, ok := <-c.client.Notifications: + // notification channel is used as a connection + // state: if it is closed, the connection is + // considered to be lost + if !ok { + c.logger.Warn("switching to the next RPC node") + + if !c.switchRPC() { + c.logger.Error("could not establish connection to any RPC node") + + // could not connect to all endpoints => + // switch client to inactive mode + c.inactiveMode() + + return + } + + newEndpoint, _ := c.endpoints.current() + + c.logger.Warn("connection to the new RPC node has been established", + zap.String("endpoint", newEndpoint), + ) + + if !c.restoreSubscriptions() { + // new WS client does not allow + // restoring subscription, client + // could not work correctly => + // closing connection to RPC node + // to switch to another one + c.client.Close() + } + + // TODO(@carpawell): call here some callback retrieved in constructor + // of the client to allow checking chain state since during switch + // process some notification could be lost + + continue } - return nil - } - // we dont need to continue the process after the logical error was encountered - if errNeoFS := unwrapNeoFSError(err); errNeoFS != nil { - return errNeoFS - } - - // set first error once - if firstErr == nil { - firstErr = err + c.notifications <- n } } - - return firstErr +} + +// close closes notification channel and wrapped WS client +func (c *Client) close() { + close(c.notifications) + c.client.Close() } diff --git a/pkg/morph/client/nns.go b/pkg/morph/client/nns.go index 53a765601..f85f99a10 100644 --- a/pkg/morph/client/nns.go +++ b/pkg/morph/client/nns.go @@ -55,11 +55,11 @@ func NNSAlphabetContractName(index int) string { // in NNS contract. // If script hash has not been found, returns ErrNNSRecordNotFound. func (c *Client) NNSContractAddress(name string) (sh util.Uint160, err error) { - if c.multiClient != nil { - return sh, c.multiClient.iterateClients(func(c *Client) error { - sh, err = c.NNSContractAddress(name) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return util.Uint160{}, ErrConnectionLost } nnsHash, err := c.NNSHash() @@ -76,13 +76,11 @@ func (c *Client) NNSContractAddress(name string) (sh util.Uint160, err error) { // NNSHash returns NNS contract hash. func (c *Client) NNSHash() (util.Uint160, error) { - if c.multiClient != nil { - var sh util.Uint160 - return sh, c.multiClient.iterateClients(func(c *Client) error { - var err error - sh, err = c.NNSHash() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return util.Uint160{}, ErrConnectionLost } c.mtx.RLock() @@ -101,7 +99,7 @@ func (c *Client) NNSHash() (util.Uint160, error) { return nnsHash, nil } -func nnsResolveItem(c *client.Client, nnsHash util.Uint160, domain string) (stackitem.Item, error) { +func nnsResolveItem(c *client.WSClient, nnsHash util.Uint160, domain string) (stackitem.Item, error) { found, err := exists(c, nnsHash, domain) if err != nil { return nil, fmt.Errorf("could not check presence in NNS contract for %s: %w", domain, err) @@ -133,7 +131,7 @@ func nnsResolveItem(c *client.Client, nnsHash util.Uint160, domain string) (stac return result.Stack[0], nil } -func nnsResolve(c *client.Client, nnsHash util.Uint160, domain string) (util.Uint160, error) { +func nnsResolve(c *client.WSClient, nnsHash util.Uint160, domain string) (util.Uint160, error) { res, err := nnsResolveItem(c, nnsHash, domain) if err != nil { return util.Uint160{}, err @@ -155,7 +153,7 @@ func nnsResolve(c *client.Client, nnsHash util.Uint160, domain string) (util.Uin return util.Uint160DecodeStringLE(string(bs)) } -func exists(c *client.Client, nnsHash util.Uint160, domain string) (bool, error) { +func exists(c *client.WSClient, nnsHash util.Uint160, domain string) (bool, error) { result, err := c.InvokeFunction(nnsHash, "isAvailable", []smartcontract.Parameter{ { Type: smartcontract.StringType, @@ -185,10 +183,11 @@ func exists(c *client.Client, nnsHash util.Uint160, domain string) (bool, error) // SetGroupSignerScope makes the default signer scope include all NeoFS contracts. // Should be called for side-chain client only. func (c *Client) SetGroupSignerScope() error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return wrapNeoFSError(c.SetGroupSignerScope()) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } pub, err := c.contractGroupKey() diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index 5be1e435d..14b881fb2 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -73,6 +73,13 @@ func defaultNotaryConfig(c *Client) *notaryCfg { // ability for client to get alphabet keys from committee or provided source // and use proxy contract script hash to create tx for notary contract. func (c *Client) EnableNotarySupport(opts ...NotaryOption) error { + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost + } + cfg := defaultNotaryConfig(c) for _, opt := range opts { @@ -98,56 +105,23 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error { var err error - getNotaryHashFunc := func(c *Client) error { - notaryCfg.notary, err = c.client.GetNativeContractHash(nativenames.Notary) - if err != nil { - return fmt.Errorf("can't get notary contract script hash: %w", err) - } - - return nil + notaryCfg.notary, err = c.client.GetNativeContractHash(nativenames.Notary) + if err != nil { + return fmt.Errorf("can't get notary contract script hash: %w", err) } - if c.multiClient == nil { - // single client case - err = getNotaryHashFunc(c) - if err != nil { - return err - } - - c.notary = notaryCfg - - return nil - } - - // multi client case - - if err = c.iterateClients(getNotaryHashFunc); err != nil { - return err - } - - c.clientsMtx.Lock() - - c.sharedNotary = notaryCfg - - // update client cache - for _, cached := range c.clients { - cached.notary = c.sharedNotary - } - - c.clientsMtx.Unlock() + c.notary = notaryCfg return nil } // ProbeNotary checks if native `Notary` contract is presented on chain. func (c *Client) ProbeNotary() (res bool) { - if c.multiClient != nil { - _ = c.multiClient.iterateClients(func(c *Client) error { - res = c.ProbeNotary() - return nil - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() - return + if c.inactive { + return false } _, err := c.client.GetNativeContractHash(nativenames.Notary) @@ -162,11 +136,11 @@ func (c *Client) ProbeNotary() (res bool) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uint256, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.DepositNotary(amount, delta) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return util.Uint256{}, ErrConnectionLost } if c.notary == nil { @@ -219,11 +193,11 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) GetNotaryDeposit() (res int64, err error) { - if c.multiClient != nil { - return res, c.multiClient.iterateClients(func(c *Client) error { - res, err = c.GetNotaryDeposit() - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, ErrConnectionLost } if c.notary == nil { @@ -271,10 +245,11 @@ func (u *UpdateNotaryListPrm) SetHash(hash util.Uint256) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNotaryList(prm UpdateNotaryListPrm) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.UpdateNotaryList(prm) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } if c.notary == nil { @@ -318,10 +293,11 @@ func (u *UpdateAlphabetListPrm) SetHash(hash util.Uint256) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNeoFSAlphabetList(prm UpdateAlphabetListPrm) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.UpdateNeoFSAlphabetList(prm) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } if c.notary == nil { @@ -348,10 +324,11 @@ func (c *Client) UpdateNeoFSAlphabetList(prm UpdateAlphabetListPrm) error { // // `nonce` and `vub` are used only if notary is enabled. func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...interface{}) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.NotaryInvoke(contract, fee, nonce, vub, method, args...) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } if c.notary == nil { @@ -367,10 +344,11 @@ func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce ui // // Considered to be used by non-IR nodes. func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, method string, args ...interface{}) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.NotaryInvokeNotAlpha(contract, fee, method, args...) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } if c.notary == nil { @@ -385,10 +363,11 @@ func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, // NOTE: does not fallback to simple `Invoke()`. Expected to be used only for // TXs retrieved from the received notary requests. func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { - if c.multiClient != nil { - return c.multiClient.iterateClients(func(c *Client) error { - return c.NotarySignAndInvokeTX(mainTx) - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost } alphabetList, err := c.notary.alphabetSource() @@ -865,11 +844,11 @@ func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed // CalculateNonceAndVUB calculates nonce and ValidUntilBlock values // based on transaction hash. Uses MurmurHash3. func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint32, err error) { - if c.multiClient != nil { - return nonce, vub, c.multiClient.iterateClients(func(c *Client) error { - nonce, vub, err = c.CalculateNonceAndVUB(hash) - return err - }) + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return 0, 0, ErrConnectionLost } if c.notary == nil { diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go new file mode 100644 index 000000000..6f5184db7 --- /dev/null +++ b/pkg/morph/client/notifications.go @@ -0,0 +1,263 @@ +package client + +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + "go.uber.org/zap" +) + +// Close closes connection to the remote side making +// this client instance unusable. Closes notification +// channel returned from Client.NotificationChannel(), +// Removes all subscription. +func (c *Client) Close() { + // closing should be done via the channel + // to prevent switching to another RPC node + // in the notification loop + c.closeChan <- struct{}{} +} + +// SubscribeForExecutionNotifications adds subscription for notifications +// generated during contract transaction execution to this instance of client. +// +// Returns ErrConnectionLost if client has not been able to establish +// connection to any of passed RPC endpoints. +func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error { + c.switchLock.Lock() + defer c.switchLock.Unlock() + + if c.inactive { + return ErrConnectionLost + } + + _, subscribed := c.subscribedEvents[contract] + if subscribed { + // no need to subscribe one more time + return nil + } + + id, err := c.client.SubscribeForExecutionNotifications(&contract, nil) + if err != nil { + return err + } + + c.subscribedEvents[contract] = id + + return nil +} + +// SubscribeForNewBlocks adds subscription for new block events to this +// instance of client. +// +// Returns ErrConnectionLost if client has not been able to establish +// connection to any of passed RPC endpoints. +func (c *Client) SubscribeForNewBlocks() error { + c.switchLock.Lock() + defer c.switchLock.Unlock() + + if c.inactive { + return ErrConnectionLost + } + + if c.subscribedToNewBlocks { + // no need to subscribe one more time + return nil + } + + _, err := c.client.SubscribeForNewBlocks(nil) + if err != nil { + return err + } + + c.subscribedToNewBlocks = true + + return nil +} + +// SubscribeForNotaryRequests adds subscription for notary request payloads +// addition or removal events to this instance of client. Passed txSigner is +// used as filter: subscription is only for the notary requests that must be +// signed by txSigner. +// +// Returns ErrConnectionLost if client has not been able to establish +// connection to any of passed RPC endpoints. +func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { + if c.notary == nil { + panic(notaryNotEnabledPanicMsg) + } + + c.switchLock.Lock() + defer c.switchLock.Unlock() + + if c.inactive { + return ErrConnectionLost + } + + _, subscribed := c.subscribedNotaryEvents[txSigner] + if subscribed { + // no need to subscribe one more time + return nil + } + + id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner) + if err != nil { + return err + } + + c.subscribedNotaryEvents[txSigner] = id + + return nil +} + +// UnsubscribeContract removes subscription for given contract event stream. +// +// Returns ErrConnectionLost if client has not been able to establish +// connection to any of passed RPC endpoints. +func (c *Client) UnsubscribeContract(contract util.Uint160) error { + c.switchLock.Lock() + defer c.switchLock.Unlock() + + if c.inactive { + return ErrConnectionLost + } + + _, subscribed := c.subscribedEvents[contract] + if !subscribed { + // no need to unsubscribe contract + // without subscription + return nil + } + + err := c.client.Unsubscribe(c.subscribedEvents[contract]) + if err != nil { + return err + } + + delete(c.subscribedEvents, contract) + + return nil +} + +// UnsubscribeNotaryRequest removes subscription for given notary requests +// signer. +// +// Returns ErrConnectionLost if client has not been able to establish +// connection to any of passed RPC endpoints. +func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error { + if c.notary == nil { + panic(notaryNotEnabledPanicMsg) + } + + c.switchLock.Lock() + defer c.switchLock.Unlock() + + if c.inactive { + return ErrConnectionLost + } + + _, subscribed := c.subscribedNotaryEvents[signer] + if !subscribed { + // no need to unsubscribe signer's + // requests without subscription + return nil + } + + err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer]) + if err != nil { + return err + } + + delete(c.subscribedNotaryEvents, signer) + + return nil +} + +// UnsubscribeAll removes all active subscriptions of current client. +// +// Returns ErrConnectionLost if client has not been able to establish +// connection to any of passed RPC endpoints. +func (c *Client) UnsubscribeAll() error { + c.switchLock.Lock() + defer c.switchLock.Unlock() + + if c.inactive { + return ErrConnectionLost + } + + // no need to unsubscribe if there are + // no active subscriptions + if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 && + !c.subscribedToNewBlocks { + return nil + } + + err := c.client.UnsubscribeAll() + if err != nil { + return err + } + + c.subscribedEvents = make(map[util.Uint160]string) + c.subscribedNotaryEvents = make(map[util.Uint160]string) + c.subscribedToNewBlocks = false + + return nil +} + +// restoreSubscriptions restores subscriptions according to +// cached information about them. +func (c *Client) restoreSubscriptions() bool { + var ( + err error + id string + endpoint, _ = c.endpoints.current() + ) + + c.switchLock.Lock() + defer c.switchLock.Unlock() + + // new block events restoration + if c.subscribedToNewBlocks { + _, err = c.client.SubscribeForNewBlocks(nil) + if err != nil { + c.logger.Error("could not restore block subscription after RPC switch", + zap.String("endpoint", endpoint), + zap.Error(err), + ) + + return false + } + } + + // notification events restoration + for contract := range c.subscribedEvents { + id, err = c.client.SubscribeForExecutionNotifications(&contract, nil) + if err != nil { + c.logger.Error("could not restore notification subscription after RPC switch", + zap.String("endpoint", endpoint), + zap.Error(err), + ) + + return false + } + + c.subscribedEvents[contract] = id + } + + // notary notification events restoration + if c.notary != nil { + for signer := range c.subscribedNotaryEvents { + id, err = c.client.SubscribeForNotaryRequests(nil, &signer) + if err != nil { + c.logger.Error("could not restore notary notification subscription after RPC switch", + zap.String("endpoint", endpoint), + zap.Error(err), + ) + + return false + } + + c.subscribedNotaryEvents[signer] = id + } + } + + return true +}