[#1170] pkg/morph: Change HTTP for WS client
Updated client now supports subscription to chain notifications and RPC switch between provided RPC endpoints. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
71c75dc7e8
commit
402f488bec
7 changed files with 661 additions and 327 deletions
|
@ -1,117 +1,157 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type multiClient struct {
|
||||
cfg cfg
|
||||
|
||||
account *wallet.Account
|
||||
|
||||
sharedNotary *notary // notary config needed for single client construction
|
||||
|
||||
endpoints []string
|
||||
clientsMtx sync.RWMutex
|
||||
// lastSuccess is an index in endpoints array relating to a last
|
||||
// used endpoint.
|
||||
lastSuccess int
|
||||
clients map[string]*Client
|
||||
type endpoints struct {
|
||||
curr int
|
||||
list []string
|
||||
}
|
||||
|
||||
// createForAddress creates single Client instance using provided endpoint.
|
||||
func (x *multiClient) createForAddress(addr string) (*Client, error) {
|
||||
cli, err := client.New(x.cfg.ctx, addr, client.Options{
|
||||
DialTimeout: x.cfg.dialTimeout,
|
||||
MaxConnsPerHost: x.cfg.maxConnPerHost,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func newEndpoints(ee []string) *endpoints {
|
||||
return &endpoints{
|
||||
curr: 0,
|
||||
list: ee,
|
||||
}
|
||||
|
||||
err = cli.Init() // magic number is set there based on RPC node answer
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var c *Client
|
||||
|
||||
x.clientsMtx.Lock()
|
||||
// While creating 2 clients in parallel is ok, we don't want to
|
||||
// use a client missing from `x.clients` map as it can lead
|
||||
// to unexpected bugs.
|
||||
if x.clients[addr] == nil {
|
||||
sCli := blankSingleClient(cli, x.account, &x.cfg)
|
||||
sCli.notary = x.sharedNotary
|
||||
|
||||
c = &Client{
|
||||
cache: newClientCache(),
|
||||
singleClient: sCli,
|
||||
}
|
||||
x.clients[addr] = c
|
||||
} else {
|
||||
c = x.clients[addr]
|
||||
}
|
||||
x.clientsMtx.Unlock()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// iterateClients executes f on each client until nil error is returned.
|
||||
// When nil error is returned, lastSuccess field is updated.
|
||||
// The iteration order is non-deterministic and shouldn't be relied upon.
|
||||
func (x *multiClient) iterateClients(f func(*Client) error) error {
|
||||
var (
|
||||
firstErr error
|
||||
err error
|
||||
)
|
||||
// next returns the next endpoint and its index
|
||||
// to try to connect to.
|
||||
// Returns -1 index if there is no known RPC endpoints.
|
||||
func (e *endpoints) next() (string, int) {
|
||||
if len(e.list) == 0 {
|
||||
return "", -1
|
||||
}
|
||||
|
||||
x.clientsMtx.RLock()
|
||||
start := x.lastSuccess
|
||||
x.clientsMtx.RUnlock()
|
||||
next := e.curr + 1
|
||||
if next == len(e.list) {
|
||||
next = 0
|
||||
}
|
||||
|
||||
for i := 0; i < len(x.endpoints); i++ {
|
||||
index := (start + i) % len(x.endpoints)
|
||||
e.curr = next
|
||||
|
||||
x.clientsMtx.RLock()
|
||||
c, cached := x.clients[x.endpoints[index]]
|
||||
x.clientsMtx.RUnlock()
|
||||
if !cached {
|
||||
c, err = x.createForAddress(x.endpoints[index])
|
||||
return e.list[next], next
|
||||
}
|
||||
|
||||
// current returns an endpoint and its index the Client
|
||||
// is connected to.
|
||||
// Returns -1 index if there is no known RPC endpoints
|
||||
func (e *endpoints) current() (string, int) {
|
||||
if len(e.list) == 0 {
|
||||
return "", -1
|
||||
}
|
||||
|
||||
return e.list[e.curr], e.curr
|
||||
}
|
||||
|
||||
func (c *Client) switchRPC() bool {
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
c.client.Close()
|
||||
|
||||
_, currEndpointIndex := c.endpoints.current()
|
||||
if currEndpointIndex == -1 {
|
||||
// there are no known RPC endpoints to try
|
||||
// to connect to => do not switch
|
||||
return false
|
||||
}
|
||||
|
||||
for {
|
||||
newEndpoint, index := c.endpoints.next()
|
||||
if index == currEndpointIndex {
|
||||
// all the endpoint have been tried
|
||||
// for connection unsuccessfully
|
||||
return false
|
||||
}
|
||||
|
||||
if !cached && err != nil {
|
||||
x.cfg.logger.Error("could not open morph client connection",
|
||||
zap.String("endpoint", x.endpoints[index]),
|
||||
zap.String("err", err.Error()),
|
||||
cli, err := newWSClient(c.cfg, newEndpoint)
|
||||
if err != nil {
|
||||
c.logger.Warn("could not establish connection to the switched RPC node",
|
||||
zap.String("endpoint", newEndpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
err = f(c)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
if i != 0 {
|
||||
x.clientsMtx.Lock()
|
||||
x.lastSuccess = index
|
||||
x.clientsMtx.Unlock()
|
||||
err = cli.Init()
|
||||
if err != nil {
|
||||
cli.Close()
|
||||
c.logger.Warn("could not init the switched RPC node",
|
||||
zap.String("endpoint", newEndpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
c.client = cli
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) notificationLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-c.cfg.ctx.Done():
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
case <-c.closeChan:
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
case n, ok := <-c.client.Notifications:
|
||||
// notification channel is used as a connection
|
||||
// state: if it is closed, the connection is
|
||||
// considered to be lost
|
||||
if !ok {
|
||||
c.logger.Warn("switching to the next RPC node")
|
||||
|
||||
if !c.switchRPC() {
|
||||
c.logger.Error("could not establish connection to any RPC node")
|
||||
|
||||
// could not connect to all endpoints =>
|
||||
// switch client to inactive mode
|
||||
c.inactiveMode()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
newEndpoint, _ := c.endpoints.current()
|
||||
|
||||
c.logger.Warn("connection to the new RPC node has been established",
|
||||
zap.String("endpoint", newEndpoint),
|
||||
)
|
||||
|
||||
if !c.restoreSubscriptions() {
|
||||
// new WS client does not allow
|
||||
// restoring subscription, client
|
||||
// could not work correctly =>
|
||||
// closing connection to RPC node
|
||||
// to switch to another one
|
||||
c.client.Close()
|
||||
}
|
||||
|
||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||
// of the client to allow checking chain state since during switch
|
||||
// process some notification could be lost
|
||||
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// we dont need to continue the process after the logical error was encountered
|
||||
if errNeoFS := unwrapNeoFSError(err); errNeoFS != nil {
|
||||
return errNeoFS
|
||||
}
|
||||
|
||||
// set first error once
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
c.notifications <- n
|
||||
}
|
||||
}
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
||||
// close closes notification channel and wrapped WS client
|
||||
func (c *Client) close() {
|
||||
close(c.notifications)
|
||||
c.client.Close()
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue