From 23bec300dd6accf468806a174a800c95e30b2e92 Mon Sep 17 00:00:00 2001 From: aarifullin Date: Wed, 27 Sep 2023 17:20:17 +0300 Subject: [PATCH] [#708] morph: Introduce new parameters for subscriber constructor * Make subscriber set notification channel sizes. Buffered notification channels helps to avoid problems with breaking connection within websocket client Signed-off-by: Airat Arifullin --- pkg/morph/subscriber/subscriber.go | 32 +++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index c2d8494fa..afeed20b5 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -60,9 +60,16 @@ type ( // Params is a group of Subscriber constructor parameters. Params struct { - Log *logger.Logger - StartFromBlock uint32 - Client *client.Client + Log *logger.Logger + StartFromBlock uint32 + Client *client.Client + NotificationsConfig NotificationsConfig + } + + NotificationsConfig struct { + NotificationChannelSize uint32 + BlockChannelSize uint32 + NotaryRequestChannelSize uint32 } ) @@ -169,7 +176,7 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { blockChan: make(chan *block.Block), notaryChan: make(chan *result.NotaryRequestEvent), - current: newSubChannels(), + current: newSubChannels(p.NotificationsConfig), subscribedEvents: make(map[util.Uint160]bool), subscribedNotaryEvents: make(map[util.Uint160]bool), @@ -259,7 +266,14 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) ( cliCh := s.client.NotificationChannel() s.Lock() - chs := newSubChannels() + + param := NotificationsConfig{ + NotificationChannelSize: uint32(cap(s.current.NotifyChan)), + BlockChannelSize: uint32(cap(s.current.BlockChan)), + NotaryRequestChannelSize: uint32(cap(s.current.NotaryChan)), + } + + chs := newSubChannels(param) go func() { finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan) }() @@ -270,11 +284,11 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) ( return true, cliCh } -func newSubChannels() subChannels { +func newSubChannels(param NotificationsConfig) subChannels { return subChannels{ - NotifyChan: make(chan *state.ContainedNotificationEvent), - BlockChan: make(chan *block.Block), - NotaryChan: make(chan *result.NotaryRequestEvent), + NotifyChan: make(chan *state.ContainedNotificationEvent, param.NotificationChannelSize), + BlockChan: make(chan *block.Block, param.BlockChannelSize), + NotaryChan: make(chan *result.NotaryRequestEvent, param.NotaryRequestChannelSize), } } -- 2.45.2