forked from TrueCloudLab/frostfs-node
[#72] Add ListenWithError method in Listener interface
Listen and ListenWithError methods check if subscriber channel has been closed. If so, ListenWithError passes error message into provided channel. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
ca006245d2
commit
ccbb9ce6ab
1 changed files with 33 additions and 6 deletions
|
@ -17,10 +17,16 @@ type Listener interface {
|
|||
// Must start the event listener.
|
||||
//
|
||||
// Must listen to events with the parser installed.
|
||||
//
|
||||
// Must return an error if event listening could not be started.
|
||||
Listen(context.Context)
|
||||
|
||||
// Must start the event listener.
|
||||
//
|
||||
// Must listen to events with the parser installed.
|
||||
//
|
||||
// Must send error to channel if subscriber channel has been closed or
|
||||
// it could not be started.
|
||||
ListenWithError(context.Context, chan<- error)
|
||||
|
||||
// Must set the parser of particular contract event.
|
||||
//
|
||||
// Parser of each event must be set once. All parsers must be set before Listen call.
|
||||
|
@ -78,7 +84,7 @@ var (
|
|||
// Returns an error if listener was already started.
|
||||
func (s listener) Listen(ctx context.Context) {
|
||||
s.once.Do(func() {
|
||||
if err := s.listen(ctx); err != nil {
|
||||
if err := s.listen(ctx, nil); err != nil {
|
||||
s.log.Error("could not start listen to events",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
@ -86,7 +92,24 @@ func (s listener) Listen(ctx context.Context) {
|
|||
})
|
||||
}
|
||||
|
||||
func (s listener) listen(ctx context.Context) error {
|
||||
// ListenWithError starts the listening for events with registered handlers and
|
||||
// passing error message to intError channel if subscriber channel has been closed.
|
||||
//
|
||||
// Executes once, all subsequent calls do nothing.
|
||||
//
|
||||
// Returns an error if listener was already started.
|
||||
func (s listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
||||
s.once.Do(func() {
|
||||
if err := s.listen(ctx, intError); err != nil {
|
||||
s.log.Error("could not start listen to events",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
intError <- err
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s listener) listen(ctx context.Context, intError chan<- error) error {
|
||||
// create the list of listening contract hashes
|
||||
hashes := make([]util.Uint160, 0)
|
||||
|
||||
|
@ -115,12 +138,12 @@ func (s listener) listen(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.listenLoop(ctx, chEvent)
|
||||
s.listenLoop(ctx, chEvent, intError)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s listener) listenLoop(ctx context.Context, chEvent <-chan *result.NotificationEvent) {
|
||||
func (s listener) listenLoop(ctx context.Context, chEvent <-chan *result.NotificationEvent, intErr chan<- error) {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
|
@ -132,6 +155,10 @@ loop:
|
|||
case notifyEvent, ok := <-chEvent:
|
||||
if !ok {
|
||||
s.log.Warn("stop event listener by channel")
|
||||
if intErr != nil {
|
||||
intErr <- errors.New("event subscriber connection has been terminated")
|
||||
}
|
||||
|
||||
break loop
|
||||
} else if notifyEvent == nil {
|
||||
s.log.Warn("nil notification event was caught")
|
||||
|
|
Loading…
Reference in a new issue