From 3e45b4a085c71ffc8e4ff0d928d507254def19cb Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 10 Feb 2022 19:32:09 +0300 Subject: [PATCH] [#1170] pkg/morph/subscriber: Adopt new WS client Signed-off-by: Pavel Karpy --- cmd/neofs-node/morph.go | 32 ++---------- pkg/innerring/innerring.go | 42 ++++++---------- pkg/morph/subscriber/subscriber.go | 81 +++++++++--------------------- 3 files changed, 46 insertions(+), 109 deletions(-) diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index 25d6e706..a33401e9 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -47,7 +47,6 @@ func initMorphComponents(c *cfg) { client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithLogger(c.log), client.WithExtraEndpoints(addresses[1:]), - client.WithMaxConnectionPerHost(morphconfig.MaxConnPerHost(c.appCfg)), ) if err != nil { c.log.Info("failed to create neo RPC client", @@ -176,38 +175,17 @@ func listenMorphNotifications(c *cfg) { subs subscriber.Subscriber ) - endpoints := morphconfig.NotificationEndpoint(c.appCfg) - timeout := morphconfig.DialTimeout(c.appCfg) - - rand.Shuffle(len(endpoints), func(i, j int) { - endpoints[i], endpoints[j] = endpoints[j], endpoints[i] - }) - fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey) if err != nil { fromSideChainBlock = 0 c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) } - for i := range endpoints { - subs, err = subscriber.New(c.ctx, &subscriber.Params{ - Log: c.log, - Endpoint: endpoints[i], - DialTimeout: timeout, - StartFromBlock: fromSideChainBlock, - }) - if err == nil { - c.log.Info("websocket neo event listener established", - zap.String("endpoint", endpoints[i])) - - break - } - - c.log.Info("failed to establish websocket neo event listener, trying another", - zap.String("endpoint", endpoints[i]), - zap.String("error", err.Error())) - } - + subs, err = subscriber.New(c.ctx, &subscriber.Params{ + Log: c.log, + StartFromBlock: fromSideChainBlock, + Client: c.cfgMorph.client, + }) fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 2d8a8326..21945185 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -353,14 +353,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error from: fromSideChainBlock, } - // create morph listener - server.morphListener, err = createListener(ctx, morphChain) + // create morph client + server.morphClient, err = createClient(ctx, morphChain) if err != nil { return nil, err } - // create morph client - server.morphClient, err = createClient(ctx, morphChain) + // create morph listener + server.morphListener, err = createListener(ctx, server.morphClient, morphChain) if err != nil { return nil, err } @@ -388,14 +388,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error } mainnetChain.from = fromMainChainBlock - // create mainnet listener - server.mainnetListener, err = createListener(ctx, mainnetChain) + // create mainnet client + server.mainnetClient, err = createClient(ctx, mainnetChain) if err != nil { return nil, err } - // create mainnet client - server.mainnetClient, err = createClient(ctx, mainnetChain) + // create mainnet listener + server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain) if err != nil { return nil, err } @@ -920,7 +920,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return server, nil } -func createListener(ctx context.Context, p *chainParams) (event.Listener, error) { +func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { // config name left unchanged for compatibility, may be its better to rename it to "endpoints" endpoints := p.cfg.GetStringSlice(p.name + ".endpoint.notification") if len(endpoints) == 0 { @@ -932,23 +932,13 @@ func createListener(ctx context.Context, p *chainParams) (event.Listener, error) err error ) - dialTimeout := p.cfg.GetDuration(p.name + ".dial_timeout") - - for i := range endpoints { - sub, err = subscriber.New(ctx, &subscriber.Params{ - Log: p.log, - Endpoint: endpoints[i], - DialTimeout: dialTimeout, - StartFromBlock: p.from, - }) - if err == nil { - break - } - - p.log.Info("failed to establish websocket neo event listener, trying another", - zap.String("endpoint", endpoints[i]), - zap.String("error", err.Error()), - ) + sub, err = subscriber.New(ctx, &subscriber.Params{ + Log: p.log, + StartFromBlock: p.from, + Client: cli, + }) + if err != nil { + return nil, err } listener, err := event.NewListener(event.ListenerParams{ diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index 4c394c49..3e28aa7e 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -5,13 +5,12 @@ import ( "errors" "fmt" "sync" - "time" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" "go.uber.org/zap" ) @@ -28,10 +27,9 @@ type ( subscriber struct { *sync.RWMutex log *zap.Logger - client *client.WSClient + client *client.Client notifyChan chan *subscriptions.NotificationEvent - notifyIDs map[util.Uint160]string blockChan chan *block.Block @@ -41,9 +39,8 @@ type ( // Params is a group of Subscriber constructor parameters. Params struct { Log *zap.Logger - Endpoint string - DialTimeout time.Duration StartFromBlock uint32 + Client *client.Client } ) @@ -51,56 +48,40 @@ var ( errNilParams = errors.New("chain/subscriber: config was not provided to the constructor") errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor") + + errNilClient = errors.New("chain/subscriber: client was not provided to the constructor") ) func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) (<-chan *subscriptions.NotificationEvent, error) { s.Lock() defer s.Unlock() - notifyIDs := make(map[util.Uint160]string, len(contracts)) + notifyIDs := make(map[util.Uint160]struct{}, len(contracts)) for i := range contracts { - // do not subscribe to already subscribed contracts - if _, ok := s.notifyIDs[contracts[i]]; ok { - continue - } - // subscribe to contract notifications - id, err := s.client.SubscribeForExecutionNotifications(&contracts[i], nil) + err := s.client.SubscribeForExecutionNotifications(contracts[i]) if err != nil { // if there is some error, undo all subscriptions and return error - for _, id := range notifyIDs { - _ = s.client.Unsubscribe(id) + for hash := range notifyIDs { + _ = s.client.UnsubscribeContract(hash) } return nil, err } // save notification id - notifyIDs[contracts[i]] = id - } - - // update global map of subscribed contracts - for contract, id := range notifyIDs { - s.notifyIDs[contract] = id + notifyIDs[contracts[i]] = struct{}{} } return s.notifyChan, nil } func (s *subscriber) UnsubscribeForNotification() { - s.Lock() - defer s.Unlock() - - for i := range s.notifyIDs { - err := s.client.Unsubscribe(s.notifyIDs[i]) - if err != nil { - s.log.Error("unsubscribe for notification", - zap.String("event", s.notifyIDs[i]), - zap.Error(err)) - } - - delete(s.notifyIDs, i) + err := s.client.UnsubscribeAll() + if err != nil { + s.log.Error("unsubscribe for notification", + zap.Error(err)) } } @@ -109,7 +90,7 @@ func (s *subscriber) Close() { } func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) { - if _, err := s.client.SubscribeForNewBlocks(nil); err != nil { + if err := s.client.SubscribeForNewBlocks(); err != nil { return nil, fmt.Errorf("could not subscribe for new block events: %w", err) } @@ -117,7 +98,7 @@ func (s *subscriber) BlockNotifications() (<-chan *block.Block, error) { } func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-chan *subscriptions.NotaryRequestEvent, error) { - if _, err := s.client.SubscribeForNotaryRequests(nil, &mainTXSigner); err != nil { + if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil { return nil, fmt.Errorf("could not subscribe for notary request events: %w", err) } @@ -125,11 +106,13 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) (<-ch } func (s *subscriber) routeNotifications(ctx context.Context) { + notificationChan := s.client.NotificationChannel() + for { select { case <-ctx.Done(): return - case notification, ok := <-s.client.Notifications: + case notification, ok := <-notificationChan: if !ok { s.log.Warn("remote notification channel has been closed") close(s.notifyChan) @@ -186,24 +169,11 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { return nil, errNilParams case p.Log == nil: return nil, errNilLogger + case p.Client == nil: + return nil, errNilClient } - wsClient, err := client.NewWS(ctx, p.Endpoint, client.Options{ - DialTimeout: p.DialTimeout, - }) - if err != nil { - return nil, err - } - - if err := wsClient.Init(); err != nil { - return nil, fmt.Errorf("could not init ws client: %w", err) - } - - p.Log.Debug("event subscriber awaits RPC node", - zap.String("endpoint", p.Endpoint), - zap.Uint32("min_block_height", p.StartFromBlock)) - - err = awaitHeight(wsClient, p.StartFromBlock) + err := awaitHeight(p.Client, p.StartFromBlock) if err != nil { return nil, err } @@ -211,9 +181,8 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { sub := &subscriber{ RWMutex: new(sync.RWMutex), log: p.Log, - client: wsClient, + client: p.Client, notifyChan: make(chan *subscriptions.NotificationEvent), - notifyIDs: make(map[util.Uint160]string), blockChan: make(chan *block.Block), notaryChan: make(chan *subscriptions.NotaryRequestEvent), } @@ -231,12 +200,12 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { // This function is required to avoid connections to unsynced RPC nodes, because // they can produce events from the past that should not be processed by // NeoFS nodes. -func awaitHeight(wsClient *client.WSClient, startFrom uint32) error { +func awaitHeight(cli *client.Client, startFrom uint32) error { if startFrom == 0 { return nil } - height, err := wsClient.GetBlockCount() + height, err := cli.BlockCount() if err != nil { return fmt.Errorf("could not get block height: %w", err) }