From ccbb9ce6abd4840031226319784cd30535e58378 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 13 Oct 2020 18:37:38 +0300 Subject: [PATCH] [#72] Add ListenWithError method in Listener interface Listen and ListenWithError methods check if subscriber channel has been closed. If so, ListenWithError passes error message into provided channel. Signed-off-by: Alex Vanin --- pkg/morph/event/listener.go | 39 +++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index 16d72d760..b853babae 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -17,10 +17,16 @@ type Listener interface { // Must start the event listener. // // Must listen to events with the parser installed. - // - // Must return an error if event listening could not be started. Listen(context.Context) + // Must start the event listener. + // + // Must listen to events with the parser installed. + // + // Must send error to channel if subscriber channel has been closed or + // it could not be started. + ListenWithError(context.Context, chan<- error) + // Must set the parser of particular contract event. // // Parser of each event must be set once. All parsers must be set before Listen call. @@ -78,7 +84,7 @@ var ( // Returns an error if listener was already started. func (s listener) Listen(ctx context.Context) { s.once.Do(func() { - if err := s.listen(ctx); err != nil { + if err := s.listen(ctx, nil); err != nil { s.log.Error("could not start listen to events", zap.String("error", err.Error()), ) @@ -86,7 +92,24 @@ func (s listener) Listen(ctx context.Context) { }) } -func (s listener) listen(ctx context.Context) error { +// ListenWithError starts the listening for events with registered handlers and +// passing error message to intError channel if subscriber channel has been closed. +// +// Executes once, all subsequent calls do nothing. +// +// Returns an error if listener was already started. +func (s listener) ListenWithError(ctx context.Context, intError chan<- error) { + s.once.Do(func() { + if err := s.listen(ctx, intError); err != nil { + s.log.Error("could not start listen to events", + zap.String("error", err.Error()), + ) + intError <- err + } + }) +} + +func (s listener) listen(ctx context.Context, intError chan<- error) error { // create the list of listening contract hashes hashes := make([]util.Uint160, 0) @@ -115,12 +138,12 @@ func (s listener) listen(ctx context.Context) error { return err } - s.listenLoop(ctx, chEvent) + s.listenLoop(ctx, chEvent, intError) return nil } -func (s listener) listenLoop(ctx context.Context, chEvent <-chan *result.NotificationEvent) { +func (s listener) listenLoop(ctx context.Context, chEvent <-chan *result.NotificationEvent, intErr chan<- error) { loop: for { select { @@ -132,6 +155,10 @@ loop: case notifyEvent, ok := <-chEvent: if !ok { s.log.Warn("stop event listener by channel") + if intErr != nil { + intErr <- errors.New("event subscriber connection has been terminated") + } + break loop } else if notifyEvent == nil { s.log.Warn("nil notification event was caught")