forked from TrueCloudLab/neoneo-go
Merge pull request #2809 from nspcc-dev/fix-subs
rpcsrv: do not block blockchain events receiver by subscription requests
This commit is contained in:
commit
ca9fde745b
2 changed files with 93 additions and 24 deletions
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/network"
|
"github.com/nspcc-dev/neo-go/pkg/network"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
|
@ -2034,10 +2035,24 @@ func TestWSClient_Wait(t *testing.T) {
|
||||||
rcvr <- aer
|
rcvr <- aer
|
||||||
}()
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
|
// Wait until client is properly subscribed. The real node won't behave like this,
|
||||||
|
// but the real node has the subsequent blocks to be added that will trigger client's
|
||||||
|
// waitloops to finish anyway (and the test has only single block, thus, use it careful).
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
rpcSrv.subsLock.Lock()
|
rpcSrv.subsLock.Lock()
|
||||||
defer rpcSrv.subsLock.Unlock()
|
defer rpcSrv.subsLock.Unlock()
|
||||||
return len(rpcSrv.subscribers) == 1
|
if len(rpcSrv.subscribers) == 1 { // single client
|
||||||
|
for s := range rpcSrv.subscribers {
|
||||||
|
var count int
|
||||||
|
for _, f := range s.feeds {
|
||||||
|
if f.event != neorpc.InvalidEventID {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count == 2 // subscription for blocks + AERs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}, time.Second, 100*time.Millisecond)
|
}, time.Second, 100*time.Millisecond)
|
||||||
require.NoError(t, chain.AddBlock(b))
|
require.NoError(t, chain.AddBlock(b))
|
||||||
}()
|
}()
|
||||||
|
@ -2057,6 +2072,25 @@ func TestWSClient_Wait(t *testing.T) {
|
||||||
t.Fatalf("transaction from block %d failed to be awaited: deadline exceeded", b.Index)
|
t.Fatalf("transaction from block %d failed to be awaited: deadline exceeded", b.Index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Wait for server/client to properly unsubscribe. In real life subsequent awaiter
|
||||||
|
// requests may be run concurrently, and it's OK, but it's important for the test
|
||||||
|
// not to run subscription requests in parallel because block addition is bounded to
|
||||||
|
// the number of subscribers.
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
rpcSrv.subsLock.Lock()
|
||||||
|
defer rpcSrv.subsLock.Unlock()
|
||||||
|
if len(rpcSrv.subscribers) != 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for s := range rpcSrv.subscribers {
|
||||||
|
for _, f := range s.feeds {
|
||||||
|
if f.event != neorpc.InvalidEventID {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}, time.Second, 100*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
var faultedChecked bool
|
var faultedChecked bool
|
||||||
|
@ -2102,7 +2136,7 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) {
|
||||||
tx := b1.Transactions[0]
|
tx := b1.Transactions[0]
|
||||||
require.NoError(t, chain.AddBlock(b1))
|
require.NoError(t, chain.AddBlock(b1))
|
||||||
|
|
||||||
// After that, subscribe for AERs/blocks and wait.
|
// After that, subscribe for AERs/blocks.
|
||||||
rcvr := make(chan *state.AppExecResult)
|
rcvr := make(chan *state.AppExecResult)
|
||||||
go func() {
|
go func() {
|
||||||
aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil)
|
aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil)
|
||||||
|
@ -2110,8 +2144,28 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) {
|
||||||
rcvr <- aer
|
rcvr <- aer
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Wait until client is properly subscribed. The real node won't behave like this,
|
||||||
|
// but the real node has the subsequent blocks to be added that will trigger client's
|
||||||
|
// waitloops to finish anyway (and the test has only single block, thus, use it careful).
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
rpcSrv.subsLock.Lock()
|
||||||
|
defer rpcSrv.subsLock.Unlock()
|
||||||
|
if len(rpcSrv.subscribers) == 1 { // single client
|
||||||
|
for s := range rpcSrv.subscribers {
|
||||||
|
var count int
|
||||||
|
for _, f := range s.feeds {
|
||||||
|
if f.event != neorpc.InvalidEventID {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count == 2 // subscription for blocks + AERs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, time.Second, 100*time.Millisecond)
|
||||||
|
|
||||||
// Accept the next block to trigger event-based waiter loop exit and rollback to
|
// Accept the next block to trigger event-based waiter loop exit and rollback to
|
||||||
// poll-based waiter.
|
// a poll-based waiter.
|
||||||
require.NoError(t, chain.AddBlock(b2))
|
require.NoError(t, chain.AddBlock(b2))
|
||||||
|
|
||||||
// Wait for the result.
|
// Wait for the result.
|
||||||
|
|
|
@ -138,11 +138,14 @@ type (
|
||||||
|
|
||||||
subsLock sync.RWMutex
|
subsLock sync.RWMutex
|
||||||
subscribers map[*subscriber]bool
|
subscribers map[*subscriber]bool
|
||||||
|
|
||||||
|
subsCounterLock sync.RWMutex
|
||||||
blockSubs int
|
blockSubs int
|
||||||
executionSubs int
|
executionSubs int
|
||||||
notificationSubs int
|
notificationSubs int
|
||||||
transactionSubs int
|
transactionSubs int
|
||||||
notaryRequestSubs int
|
notaryRequestSubs int
|
||||||
|
|
||||||
blockCh chan *block.Block
|
blockCh chan *block.Block
|
||||||
executionCh chan *state.AppExecResult
|
executionCh chan *state.AppExecResult
|
||||||
notificationCh chan *state.ContainedNotificationEvent
|
notificationCh chan *state.ContainedNotificationEvent
|
||||||
|
@ -591,14 +594,17 @@ requestloop:
|
||||||
case resChan <- res:
|
case resChan <- res:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.subsLock.Lock()
|
s.subsLock.Lock()
|
||||||
delete(s.subscribers, subscr)
|
delete(s.subscribers, subscr)
|
||||||
|
s.subsLock.Unlock()
|
||||||
|
s.subsCounterLock.Lock()
|
||||||
for _, e := range subscr.feeds {
|
for _, e := range subscr.feeds {
|
||||||
if e.event != neorpc.InvalidEventID {
|
if e.event != neorpc.InvalidEventID {
|
||||||
s.unsubscribeFromChannel(e.event)
|
s.unsubscribeFromChannel(e.event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.subsLock.Unlock()
|
s.subsCounterLock.Unlock()
|
||||||
close(resChan)
|
close(resChan)
|
||||||
ws.Close()
|
ws.Close()
|
||||||
}
|
}
|
||||||
|
@ -2441,12 +2447,6 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
|
||||||
}
|
}
|
||||||
|
|
||||||
s.subsLock.Lock()
|
s.subsLock.Lock()
|
||||||
defer s.subsLock.Unlock()
|
|
||||||
select {
|
|
||||||
case <-s.shutdown:
|
|
||||||
return nil, neorpc.NewInternalServerError("server is shutting down")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
var id int
|
var id int
|
||||||
for ; id < len(sub.feeds); id++ {
|
for ; id < len(sub.feeds); id++ {
|
||||||
if sub.feeds[id].event == neorpc.InvalidEventID {
|
if sub.feeds[id].event == neorpc.InvalidEventID {
|
||||||
|
@ -2454,16 +2454,27 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if id == len(sub.feeds) {
|
if id == len(sub.feeds) {
|
||||||
|
s.subsLock.Unlock()
|
||||||
return nil, neorpc.NewInternalServerError("maximum number of subscriptions is reached")
|
return nil, neorpc.NewInternalServerError("maximum number of subscriptions is reached")
|
||||||
}
|
}
|
||||||
sub.feeds[id].event = event
|
sub.feeds[id].event = event
|
||||||
sub.feeds[id].filter = filter
|
sub.feeds[id].filter = filter
|
||||||
|
s.subsLock.Unlock()
|
||||||
|
|
||||||
|
s.subsCounterLock.Lock()
|
||||||
|
select {
|
||||||
|
case <-s.shutdown:
|
||||||
|
s.subsCounterLock.Unlock()
|
||||||
|
return nil, neorpc.NewInternalServerError("server is shutting down")
|
||||||
|
default:
|
||||||
|
}
|
||||||
s.subscribeToChannel(event)
|
s.subscribeToChannel(event)
|
||||||
|
s.subsCounterLock.Unlock()
|
||||||
return strconv.FormatInt(int64(id), 10), nil
|
return strconv.FormatInt(int64(id), 10), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribeToChannel subscribes RPC server to appropriate chain events if
|
// subscribeToChannel subscribes RPC server to appropriate chain events if
|
||||||
// it's not yet subscribed for them. It's supposed to be called with s.subsLock
|
// it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock
|
||||||
// taken by the caller.
|
// taken by the caller.
|
||||||
func (s *Server) subscribeToChannel(event neorpc.EventID) {
|
func (s *Server) subscribeToChannel(event neorpc.EventID) {
|
||||||
switch event {
|
switch event {
|
||||||
|
@ -2502,20 +2513,24 @@ func (s *Server) unsubscribe(reqParams params.Params, sub *subscriber) (interfac
|
||||||
return nil, neorpc.ErrInvalidParams
|
return nil, neorpc.ErrInvalidParams
|
||||||
}
|
}
|
||||||
s.subsLock.Lock()
|
s.subsLock.Lock()
|
||||||
defer s.subsLock.Unlock()
|
|
||||||
if len(sub.feeds) <= id || sub.feeds[id].event == neorpc.InvalidEventID {
|
if len(sub.feeds) <= id || sub.feeds[id].event == neorpc.InvalidEventID {
|
||||||
|
s.subsLock.Unlock()
|
||||||
return nil, neorpc.ErrInvalidParams
|
return nil, neorpc.ErrInvalidParams
|
||||||
}
|
}
|
||||||
event := sub.feeds[id].event
|
event := sub.feeds[id].event
|
||||||
sub.feeds[id].event = neorpc.InvalidEventID
|
sub.feeds[id].event = neorpc.InvalidEventID
|
||||||
sub.feeds[id].filter = nil
|
sub.feeds[id].filter = nil
|
||||||
|
s.subsLock.Unlock()
|
||||||
|
|
||||||
|
s.subsCounterLock.Lock()
|
||||||
s.unsubscribeFromChannel(event)
|
s.unsubscribeFromChannel(event)
|
||||||
|
s.subsCounterLock.Unlock()
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// unsubscribeFromChannel unsubscribes RPC server from appropriate chain events
|
// unsubscribeFromChannel unsubscribes RPC server from appropriate chain events
|
||||||
// if there are no other subscribers for it. It's supposed to be called with
|
// if there are no other subscribers for it. It must be called with s.subsConutersLock
|
||||||
// s.subsLock taken by the caller.
|
// holding by the caller.
|
||||||
func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
|
func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
|
||||||
switch event {
|
switch event {
|
||||||
case neorpc.BlockEventID:
|
case neorpc.BlockEventID:
|
||||||
|
@ -2631,10 +2646,10 @@ chloop:
|
||||||
}
|
}
|
||||||
s.subsLock.RUnlock()
|
s.subsLock.RUnlock()
|
||||||
}
|
}
|
||||||
// It's important to do it with lock held because no subscription routine
|
// It's important to do it with subsCounterLock held because no subscription routine
|
||||||
// should be running concurrently to this one. And even if one is to run
|
// should be running concurrently to this one. And even if one is to run
|
||||||
// after unlock, it'll see closed s.shutdown and won't subscribe.
|
// after unlock, it'll see closed s.shutdown and won't subscribe.
|
||||||
s.subsLock.Lock()
|
s.subsCounterLock.Lock()
|
||||||
// There might be no subscription in reality, but it's not a problem as
|
// There might be no subscription in reality, but it's not a problem as
|
||||||
// core.Blockchain allows unsubscribing non-subscribed channels.
|
// core.Blockchain allows unsubscribing non-subscribed channels.
|
||||||
s.chain.UnsubscribeFromBlocks(s.blockCh)
|
s.chain.UnsubscribeFromBlocks(s.blockCh)
|
||||||
|
@ -2644,7 +2659,7 @@ chloop:
|
||||||
if s.chain.P2PSigExtensionsEnabled() {
|
if s.chain.P2PSigExtensionsEnabled() {
|
||||||
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
|
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
|
||||||
}
|
}
|
||||||
s.subsLock.Unlock()
|
s.subsCounterLock.Unlock()
|
||||||
drainloop:
|
drainloop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
Loading…
Reference in a new issue