actor: don't close already closed channel, fix #2932

Waiter should close its channels, but WSClient can also do that and it can do
that in a drain loop as well.
This commit is contained in:
Roman Khimov 2023-03-17 09:57:41 +03:00
parent 88c1e65b5b
commit e84ea0207d

View file

@ -290,6 +290,8 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
case _, ok := <-bRcvr: case _, ok := <-bRcvr:
if !ok { if !ok {
// We're toast, retry with non-ws client. // We're toast, retry with non-ws client.
bRcvr = nil
aerRcvr = nil
wsWaitErr = ErrMissedEvent wsWaitErr = ErrMissedEvent
break break
} }
@ -297,6 +299,8 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
case aer, ok := <-aerRcvr: case aer, ok := <-aerRcvr:
if !ok { if !ok {
// We're toast, retry with non-ws client. // We're toast, retry with non-ws client.
bRcvr = nil
aerRcvr = nil
wsWaitErr = ErrMissedEvent wsWaitErr = ErrMissedEvent
break break
} }
@ -314,8 +318,16 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
drainLoop: drainLoop:
for { for {
select { select {
case <-bRcvr: case _, ok := <-bRcvr:
case <-aerRcvr: if !ok { // Missed event means both channels are closed.
bRcvr = nil
aerRcvr = nil
}
case _, ok := <-aerRcvr:
if !ok { // Missed event means both channels are closed.
bRcvr = nil
aerRcvr = nil
}
case unsubErr := <-unsubErrs: case unsubErr := <-unsubErrs:
if unsubErr != nil { if unsubErr != nil {
errFmt := "unsubscription error: %v" errFmt := "unsubscription error: %v"
@ -334,8 +346,10 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
} }
} }
} }
if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) { if bRcvr != nil {
close(bRcvr) close(bRcvr)
}
if aerRcvr != nil {
close(aerRcvr) close(aerRcvr)
} }
close(unsubErrs) close(unsubErrs)