diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index c2d8494fa..afeed20b5 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -60,9 +60,16 @@ type ( // Params is a group of Subscriber constructor parameters. Params struct { - Log *logger.Logger - StartFromBlock uint32 - Client *client.Client + Log *logger.Logger + StartFromBlock uint32 + Client *client.Client + NotificationsConfig NotificationsConfig + } + + NotificationsConfig struct { + NotificationChannelSize uint32 + BlockChannelSize uint32 + NotaryRequestChannelSize uint32 } ) @@ -169,7 +176,7 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { blockChan: make(chan *block.Block), notaryChan: make(chan *result.NotaryRequestEvent), - current: newSubChannels(), + current: newSubChannels(p.NotificationsConfig), subscribedEvents: make(map[util.Uint160]bool), subscribedNotaryEvents: make(map[util.Uint160]bool), @@ -259,7 +266,14 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) ( cliCh := s.client.NotificationChannel() s.Lock() - chs := newSubChannels() + + param := NotificationsConfig{ + NotificationChannelSize: uint32(cap(s.current.NotifyChan)), + BlockChannelSize: uint32(cap(s.current.BlockChan)), + NotaryRequestChannelSize: uint32(cap(s.current.NotaryChan)), + } + + chs := newSubChannels(param) go func() { finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan) }() @@ -270,11 +284,11 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) ( return true, cliCh } -func newSubChannels() subChannels { +func newSubChannels(param NotificationsConfig) subChannels { return subChannels{ - NotifyChan: make(chan *state.ContainedNotificationEvent), - BlockChan: make(chan *block.Block), - NotaryChan: make(chan *result.NotaryRequestEvent), + NotifyChan: make(chan *state.ContainedNotificationEvent, param.NotificationChannelSize), + BlockChan: make(chan *block.Block, param.BlockChannelSize), + NotaryChan: make(chan *result.NotaryRequestEvent, param.NotaryRequestChannelSize), } }