rpc: avoid panic on double-call to *WSClient.Close()
Although it's the caller's duty to avoid WSClient re-closing, we still can handle it. Fixes the following neofs-node error: ``` panic: close of closed channel goroutine 98 [running]: github.com/nspcc-dev/neo-go/pkg/rpc/client.(*WSClient).Close(...) github.com/nspcc-dev/neo-go@v0.98.3-pre.0.20220321144433-3b639f518ebb/pkg/rpc/client/wsclient.go:120 github.com/nspcc-dev/neofs-node/pkg/morph/subscriber.(*subscriber).Close(0x13) github.com/nspcc-dev/neofs-node/pkg/morph/subscriber/subscriber.go:108 +0x29 github.com/nspcc-dev/neofs-node/pkg/morph/event.listener.Stop(...) github.com/nspcc-dev/neofs-node/pkg/morph/event/listener.go:573 created by github.com/nspcc-dev/neofs-node/pkg/innerring.(*Server).Stop github.com/nspcc-dev/neofs-node/pkg/innerring/innerring.go:285 +0x12f ```
This commit is contained in:
parent
20c0e2f2e2
commit
850f56b367
2 changed files with 26 additions and 9 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
|
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions"
|
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WSClient is a websocket-enabled RPC client that can be used with appropriate
|
// WSClient is a websocket-enabled RPC client that can be used with appropriate
|
||||||
|
@ -34,10 +35,11 @@ type WSClient struct {
|
||||||
// be closed, so make sure to handle this.
|
// be closed, so make sure to handle this.
|
||||||
Notifications chan Notification
|
Notifications chan Notification
|
||||||
|
|
||||||
ws *websocket.Conn
|
ws *websocket.Conn
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
requests chan *request.Raw
|
requests chan *request.Raw
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
|
closeCalled atomic.Bool
|
||||||
|
|
||||||
subscriptionsLock sync.RWMutex
|
subscriptionsLock sync.RWMutex
|
||||||
subscriptions map[string]bool
|
subscriptions map[string]bool
|
||||||
|
@ -93,6 +95,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
|
||||||
ws: ws,
|
ws: ws,
|
||||||
shutdown: make(chan struct{}),
|
shutdown: make(chan struct{}),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
closeCalled: *atomic.NewBool(false),
|
||||||
respChannels: make(map[uint64]chan *response.Raw),
|
respChannels: make(map[uint64]chan *response.Raw),
|
||||||
requests: make(chan *request.Raw),
|
requests: make(chan *request.Raw),
|
||||||
subscriptions: make(map[string]bool),
|
subscriptions: make(map[string]bool),
|
||||||
|
@ -113,11 +116,13 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
|
||||||
// Close closes connection to the remote side rendering this client instance
|
// Close closes connection to the remote side rendering this client instance
|
||||||
// unusable.
|
// unusable.
|
||||||
func (c *WSClient) Close() {
|
func (c *WSClient) Close() {
|
||||||
// Closing shutdown channel send signal to wsWriter to break out of the
|
if c.closeCalled.CAS(false, true) {
|
||||||
// loop. In doing so it does ws.Close() closing the network connection
|
// Closing shutdown channel send signal to wsWriter to break out of the
|
||||||
// which in turn makes wsReader receieve err from ws,ReadJSON() and also
|
// loop. In doing so it does ws.Close() closing the network connection
|
||||||
// break out of the loop closing c.done channel in its shutdown sequence.
|
// which in turn makes wsReader receieve err from ws,ReadJSON() and also
|
||||||
close(c.shutdown)
|
// break out of the loop closing c.done channel in its shutdown sequence.
|
||||||
|
close(c.shutdown)
|
||||||
|
}
|
||||||
<-c.done
|
<-c.done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -441,3 +441,15 @@ func TestWSConcurrentAccess(t *testing.T) {
|
||||||
batchCount*3+1) // batchCount*requestsPerBatch+1
|
batchCount*3+1) // batchCount*requestsPerBatch+1
|
||||||
wsc.Close()
|
wsc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWSDoubleClose(t *testing.T) {
|
||||||
|
srv := initTestServer(t, "")
|
||||||
|
|
||||||
|
c, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NotPanics(t, func() {
|
||||||
|
c.Close()
|
||||||
|
c.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue