[#1170] pkg/morph/subscriber: Adopt new WS client

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-02-10 19:32:09 +03:00 committed by Alex Vanin
parent 402f488bec
commit 3e45b4a085
3 changed files with 46 additions and 109 deletions

View file

@ -47,7 +47,6 @@ func initMorphComponents(c *cfg) {
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log), client.WithLogger(c.log),
client.WithExtraEndpoints(addresses[1:]), client.WithExtraEndpoints(addresses[1:]),
client.WithMaxConnectionPerHost(morphconfig.MaxConnPerHost(c.appCfg)),
) )
if err != nil { if err != nil {
c.log.Info("failed to create neo RPC client", c.log.Info("failed to create neo RPC client",
@ -176,38 +175,17 @@ func listenMorphNotifications(c *cfg) {
subs subscriber.Subscriber subs subscriber.Subscriber
) )
endpoints := morphconfig.NotificationEndpoint(c.appCfg)
timeout := morphconfig.DialTimeout(c.appCfg)
rand.Shuffle(len(endpoints), func(i, j int) {
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
})
fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey) fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey)
if err != nil { if err != nil {
fromSideChainBlock = 0 fromSideChainBlock = 0
c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
} }
for i := range endpoints { subs, err = subscriber.New(c.ctx, &subscriber.Params{
subs, err = subscriber.New(c.ctx, &subscriber.Params{ Log: c.log,
Log: c.log, StartFromBlock: fromSideChainBlock,
Endpoint: endpoints[i], Client: c.cfgMorph.client,
DialTimeout: timeout, })
StartFromBlock: fromSideChainBlock,
})
if err == nil {
c.log.Info("websocket neo event listener established",
zap.String("endpoint", endpoints[i]))
break
}
c.log.Info("failed to establish websocket neo event listener, trying another",
zap.String("endpoint", endpoints[i]),
zap.String("error", err.Error()))
}
fatalOnErr(err) fatalOnErr(err)
lis, err := event.NewListener(event.ListenerParams{ lis, err := event.NewListener(event.ListenerParams{

View file

@ -353,14 +353,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
from: fromSideChainBlock, from: fromSideChainBlock,
} }
// create morph listener // create morph client
server.morphListener, err = createListener(ctx, morphChain) server.morphClient, err = createClient(ctx, morphChain)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// create morph client // create morph listener
server.morphClient, err = createClient(ctx, morphChain) server.morphListener, err = createListener(ctx, server.morphClient, morphChain)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -388,14 +388,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
} }
mainnetChain.from = fromMainChainBlock mainnetChain.from = fromMainChainBlock
// create mainnet listener // create mainnet client
server.mainnetListener, err = createListener(ctx, mainnetChain) server.mainnetClient, err = createClient(ctx, mainnetChain)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// create mainnet client // create mainnet listener
server.mainnetClient, err = createClient(ctx, mainnetChain) server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -920,7 +920,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return server, nil return server, nil
} }
func createListener(ctx context.Context, p *chainParams) (event.Listener, error) { func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
// config name left unchanged for compatibility, may be its better to rename it to "endpoints" // config name left unchanged for compatibility, may be its better to rename it to "endpoints"
endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.notification") endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.notification")
if len(endpoints) == 0 { if len(endpoints) == 0 {
@ -932,23 +932,13 @@ func createListener(ctx context.Context, p *chainParams) (event.Listener, error)
err error err error
) )
dialTimeout := p.cfg.GetDuration(p.name + ".dial_timeout") sub, err = subscriber.New(ctx, &subscriber.Params{
Log: p.log,
for i := range endpoints { StartFromBlock: p.from,
sub, err = subscriber.New(ctx, &subscriber.Params{ Client: cli,
Log: p.log, })
Endpoint: endpoints[i], if err != nil {
DialTimeout: dialTimeout, return nil, err
StartFromBlock: p.from,
})
if err == nil {
break
}
p.log.Info("failed to establish websocket neo event listener, trying another",
zap.String("endpoint", endpoints[i]),
zap.String("error", err.Error()),
)
} }
listener, err := event.NewListener(event.ListenerParams{ listener, err := event.NewListener(event.ListenerParams{

View file

@ -5,13 +5,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -28,10 +27,9 @@ type (
subscriber struct { subscriber struct {
*sync.RWMutex *sync.RWMutex
log *zap.Logger log *zap.Logger
client *client.WSClient client *client.Client
notifyChan chan *subscriptions.NotificationEvent notifyChan chan *subscriptions.NotificationEvent
notifyIDs map[util.Uint160]string
blockChan chan *block.Block blockChan chan *block.Block
@ -41,9 +39,8 @@ type (
// Params is a group of Subscriber constructor parameters. // Params is a group of Subscriber constructor parameters.
Params struct { Params struct {
Log *zap.Logger Log *zap.Logger
Endpoint string
DialTimeout time.Duration
StartFromBlock uint32 StartFromBlock uint32
Client *client.Client
} }
) )
@ -51,56 +48,40 @@ var (
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor") errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor") errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor")
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
) )
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan *subscriptions.NotificationEvent, error) { func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan *subscriptions.NotificationEvent, error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
notifyIDs := make(map[util.Uint160]string, len(contracts)) notifyIDs := make(map[util.Uint160]struct{}, len(contracts))
for i := range contracts { for i := range contracts {
// do not subscribe to already subscribed contracts
if _, ok := s.notifyIDs[contracts[i]]; ok {
continue
}
// subscribe to contract notifications // subscribe to contract notifications
id, err := s.client.SubscribeForExecutionNotifications(&contracts[i], nil) err := s.client.SubscribeForExecutionNotifications(contracts[i])
if err != nil { if err != nil {
// if there is some error, undo all subscriptions and return error // if there is some error, undo all subscriptions and return error
for _, id := range notifyIDs { for hash := range notifyIDs {
_ = s.client.Unsubscribe(id) _ = s.client.UnsubscribeContract(hash)
} }
return nil, err return nil, err
} }
// save notification id // save notification id
notifyIDs[contracts[i]] = id notifyIDs[contracts[i]] = struct{}{}
}
// update global map of subscribed contracts
for contract, id := range notifyIDs {
s.notifyIDs[contract] = id
} }
return s.notifyChan, nil return s.notifyChan, nil
} }
func (s *subscriber) UnsubscribeForNotification() { func (s *subscriber) UnsubscribeForNotification() {
s.Lock() err := s.client.UnsubscribeAll()
defer s.Unlock() if err != nil {
s.log.Error("unsubscribe for notification",
for i := range s.notifyIDs { zap.Error(err))
err := s.client.Unsubscribe(s.notifyIDs[i])
if err != nil {
s.log.Error("unsubscribe for notification",
zap.String("event", s.notifyIDs[i]),
zap.Error(err))
}
delete(s.notifyIDs, i)
} }
} }
@ -109,7 +90,7 @@ func (s *subscriber) Close() {
} }
func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) { func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) {
if _, err := s.client.SubscribeForNewBlocks(nil); err != nil { if err := s.client.SubscribeForNewBlocks(); err != nil {
return nil, fmt.Errorf("could not subscribe for new block events: %w", err) return nil, fmt.Errorf("could not subscribe for new block events: %w", err)
} }
@ -117,7 +98,7 @@ func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) {
} }
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *subscriptions.NotaryRequestEvent, error) { func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *subscriptions.NotaryRequestEvent, error) {
if _, err := s.client.SubscribeForNotaryRequests(nil, &mainTXSigner); err != nil { if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil {
return nil, fmt.Errorf("could not subscribe for notary request events: %w", err) return nil, fmt.Errorf("could not subscribe for notary request events: %w", err)
} }
@ -125,11 +106,13 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-ch
} }
func (s *subscriber) routeNotifications(ctx context.Context) { func (s *subscriber) routeNotifications(ctx context.Context) {
notificationChan := s.client.NotificationChannel()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case notification, ok := <-s.client.Notifications: case notification, ok := <-notificationChan:
if !ok { if !ok {
s.log.Warn("remote notification channel has been closed") s.log.Warn("remote notification channel has been closed")
close(s.notifyChan) close(s.notifyChan)
@ -186,24 +169,11 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
return nil, errNilParams return nil, errNilParams
case p.Log == nil: case p.Log == nil:
return nil, errNilLogger return nil, errNilLogger
case p.Client == nil:
return nil, errNilClient
} }
wsClient, err := client.NewWS(ctx, p.Endpoint, client.Options{ err := awaitHeight(p.Client, p.StartFromBlock)
DialTimeout: p.DialTimeout,
})
if err != nil {
return nil, err
}
if err := wsClient.Init(); err != nil {
return nil, fmt.Errorf("could not init ws client: %w", err)
}
p.Log.Debug("event subscriber awaits RPC node",
zap.String("endpoint", p.Endpoint),
zap.Uint32("min_block_height", p.StartFromBlock))
err = awaitHeight(wsClient, p.StartFromBlock)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -211,9 +181,8 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
sub := &subscriber{ sub := &subscriber{
RWMutex: new(sync.RWMutex), RWMutex: new(sync.RWMutex),
log: p.Log, log: p.Log,
client: wsClient, client: p.Client,
notifyChan: make(chan *subscriptions.NotificationEvent), notifyChan: make(chan *subscriptions.NotificationEvent),
notifyIDs: make(map[util.Uint160]string),
blockChan: make(chan *block.Block), blockChan: make(chan *block.Block),
notaryChan: make(chan *subscriptions.NotaryRequestEvent), notaryChan: make(chan *subscriptions.NotaryRequestEvent),
} }
@ -231,12 +200,12 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
// This function is required to avoid connections to unsynced RPC nodes, because // This function is required to avoid connections to unsynced RPC nodes, because
// they can produce events from the past that should not be processed by // they can produce events from the past that should not be processed by
// NeoFS nodes. // NeoFS nodes.
func awaitHeight(wsClient *client.WSClient, startFrom uint32) error { func awaitHeight(cli *client.Client, startFrom uint32) error {
if startFrom == 0 { if startFrom == 0 {
return nil return nil
} }
height, err := wsClient.GetBlockCount() height, err := cli.BlockCount()
if err != nil { if err != nil {
return fmt.Errorf("could not get block height: %w", err) return fmt.Errorf("could not get block height: %w", err)
} }