forked from TrueCloudLab/frostfs-node
[#59] morph: Adopt updated neo-go
client API for subs
It does not use deprecated methods anymore but also adds more code that removes. Future refactor that will affect more components will optimize usage of the updated API. Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
parent
9ffa0d8fea
commit
533e9f8b75
4 changed files with 187 additions and 94 deletions
|
@ -4,6 +4,11 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
|
|||
c.logger.Info("connection to the new RPC node has been established",
|
||||
zap.String("endpoint", newEndpoint))
|
||||
|
||||
if !c.restoreSubscriptions(cli, newEndpoint) {
|
||||
subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
|
||||
if !ok {
|
||||
// new WS client does not allow
|
||||
// restoring subscription, client
|
||||
// could not work correctly =>
|
||||
|
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
|
|||
|
||||
c.client = cli
|
||||
c.setActor(act)
|
||||
c.subsInfo = subs
|
||||
|
||||
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
||||
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
||||
|
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
|
|||
}
|
||||
|
||||
func (c *Client) notificationLoop() {
|
||||
var e any
|
||||
var ok bool
|
||||
|
||||
for {
|
||||
c.switchLock.RLock()
|
||||
nChan := c.client.Notifications
|
||||
bChan := c.blockRcv
|
||||
nChan := c.notificationRcv
|
||||
nrChan := c.notaryReqRcv
|
||||
c.switchLock.RUnlock()
|
||||
|
||||
select {
|
||||
|
@ -93,57 +105,74 @@ func (c *Client) notificationLoop() {
|
|||
c.close()
|
||||
|
||||
return
|
||||
case n, ok := <-nChan:
|
||||
// notification channel is used as a connection
|
||||
// state: if it is closed, the connection is
|
||||
// considered to be lost
|
||||
if !ok {
|
||||
if closeErr := c.client.GetError(); closeErr != nil {
|
||||
c.logger.Warn("switching to the next RPC node",
|
||||
zap.String("reason", closeErr.Error()),
|
||||
)
|
||||
} else {
|
||||
// neo-go client was closed by calling `Close`
|
||||
// method that happens only when the client has
|
||||
// switched to the more prioritized RPC
|
||||
continue
|
||||
}
|
||||
case e, ok = <-bChan:
|
||||
case e, ok = <-nChan:
|
||||
case e, ok = <-nrChan:
|
||||
}
|
||||
|
||||
if !c.switchRPC() {
|
||||
c.logger.Error("could not establish connection to any RPC node")
|
||||
if ok {
|
||||
c.routeEvent(e)
|
||||
continue
|
||||
}
|
||||
|
||||
// could not connect to all endpoints =>
|
||||
// switch client to inactive mode
|
||||
c.inactiveMode()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||
// of the client to allow checking chain state since during switch
|
||||
// process some notification could be lost
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case c.notifications <- n:
|
||||
continue
|
||||
case <-c.cfg.ctx.Done():
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
case <-c.closeChan:
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
}
|
||||
if !c.reconnect() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) routeEvent(e any) {
|
||||
typedNotification := rpcclient.Notification{Value: e}
|
||||
|
||||
switch e.(type) {
|
||||
case *block.Block:
|
||||
typedNotification.Type = neorpc.BlockEventID
|
||||
case *state.ContainedNotificationEvent:
|
||||
typedNotification.Type = neorpc.NotificationEventID
|
||||
case *result.NotaryRequestEvent:
|
||||
typedNotification.Type = neorpc.NotaryRequestEventID
|
||||
}
|
||||
|
||||
select {
|
||||
case c.notifications <- typedNotification:
|
||||
case <-c.cfg.ctx.Done():
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
case <-c.closeChan:
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) reconnect() bool {
|
||||
if closeErr := c.client.GetError(); closeErr != nil {
|
||||
c.logger.Warn("switching to the next RPC node",
|
||||
zap.String("reason", closeErr.Error()),
|
||||
)
|
||||
} else {
|
||||
// neo-go client was closed by calling `Close`
|
||||
// method, that happens only when a client has
|
||||
// switched to the more prioritized RPC
|
||||
return true
|
||||
}
|
||||
|
||||
if !c.switchRPC() {
|
||||
c.logger.Error("could not establish connection to any RPC node")
|
||||
|
||||
// could not connect to all endpoints =>
|
||||
// switch client to inactive mode
|
||||
c.inactiveMode()
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||
// of the client to allow checking chain state since during switch
|
||||
// process some notification could be lost
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Client) switchToMostPrioritized() {
|
||||
t := time.NewTicker(c.cfg.switchInterval)
|
||||
defer t.Stop()
|
||||
|
@ -156,11 +185,12 @@ mainLoop:
|
|||
return
|
||||
case <-t.C:
|
||||
c.switchLock.RLock()
|
||||
|
||||
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
||||
copy(endpointsCopy, c.endpoints.list)
|
||||
|
||||
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
||||
highestPriority := c.endpoints.list[0].Priority
|
||||
|
||||
c.switchLock.RUnlock()
|
||||
|
||||
if currPriority == highestPriority {
|
||||
|
@ -186,7 +216,7 @@ mainLoop:
|
|||
continue
|
||||
}
|
||||
|
||||
if c.restoreSubscriptions(cli, tryE) {
|
||||
if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
|
||||
c.switchLock.Lock()
|
||||
|
||||
// higher priority node could have been
|
||||
|
@ -201,6 +231,7 @@ mainLoop:
|
|||
c.cache.invalidate()
|
||||
c.client = cli
|
||||
c.setActor(act)
|
||||
c.subsInfo = subs
|
||||
c.endpoints.curr = i
|
||||
|
||||
c.switchLock.Unlock()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue