[#708] morph: Introduce new parameters for subscriber constructor
Some checks failed
DCO action / DCO (pull_request) Successful in 3m4s
Vulncheck / Vulncheck (pull_request) Successful in 3m13s
Build / Build Components (1.21) (pull_request) Successful in 5m18s
Build / Build Components (1.20) (pull_request) Successful in 5m23s
Tests and linters / Tests (1.21) (pull_request) Failing after 5m41s
Tests and linters / Staticcheck (pull_request) Successful in 6m42s
Tests and linters / Tests (1.20) (pull_request) Successful in 6m58s
Tests and linters / Tests with -race (pull_request) Successful in 7m3s
Tests and linters / Lint (pull_request) Successful in 7m24s

* Make subscriber set notification channel sizes. Buffered notification
  channels helps to avoid problems with breaking connection within
  websocket client

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2023-09-27 17:20:17 +03:00
parent 368774be95
commit 23bec300dd

View file

@ -60,9 +60,16 @@ type (
// Params is a group of Subscriber constructor parameters. // Params is a group of Subscriber constructor parameters.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
StartFromBlock uint32 StartFromBlock uint32
Client *client.Client Client *client.Client
NotificationsConfig NotificationsConfig
}
NotificationsConfig struct {
NotificationChannelSize uint32
BlockChannelSize uint32
NotaryRequestChannelSize uint32
} }
) )
@ -169,7 +176,7 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
blockChan: make(chan *block.Block), blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent), notaryChan: make(chan *result.NotaryRequestEvent),
current: newSubChannels(), current: newSubChannels(p.NotificationsConfig),
subscribedEvents: make(map[util.Uint160]bool), subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool), subscribedNotaryEvents: make(map[util.Uint160]bool),
@ -259,7 +266,14 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (
cliCh := s.client.NotificationChannel() cliCh := s.client.NotificationChannel()
s.Lock() s.Lock()
chs := newSubChannels()
param := NotificationsConfig{
NotificationChannelSize: uint32(cap(s.current.NotifyChan)),
BlockChannelSize: uint32(cap(s.current.BlockChan)),
NotaryRequestChannelSize: uint32(cap(s.current.NotaryChan)),
}
chs := newSubChannels(param)
go func() { go func() {
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan) finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
}() }()
@ -270,11 +284,11 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (
return true, cliCh return true, cliCh
} }
func newSubChannels() subChannels { func newSubChannels(param NotificationsConfig) subChannels {
return subChannels{ return subChannels{
NotifyChan: make(chan *state.ContainedNotificationEvent), NotifyChan: make(chan *state.ContainedNotificationEvent, param.NotificationChannelSize),
BlockChan: make(chan *block.Block), BlockChan: make(chan *block.Block, param.BlockChannelSize),
NotaryChan: make(chan *result.NotaryRequestEvent), NotaryChan: make(chan *result.NotaryRequestEvent, param.NotaryRequestChannelSize),
} }
} }