rpc: allow to track notary requests via Notification subsystem

This commit is contained in:
Anna Shaleva 2021-05-28 14:55:06 +03:00
parent 133b600c8c
commit 1dbf1d4310
8 changed files with 276 additions and 55 deletions

View file

@ -21,6 +21,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -57,16 +58,18 @@ type (
https *http.Server
shutdown chan struct{}
subsLock sync.RWMutex
subscribers map[*subscriber]bool
blockSubs int
executionSubs int
notificationSubs int
transactionSubs int
blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.NotificationEvent
transactionCh chan *transaction.Transaction
subsLock sync.RWMutex
subscribers map[*subscriber]bool
blockSubs int
executionSubs int
notificationSubs int
transactionSubs int
notaryRequestSubs int
blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.NotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
}
)
@ -174,10 +177,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
subscribers: make(map[*subscriber]bool),
// These are NOT buffered to preserve original order of events.
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.NotificationEvent),
transactionCh: make(chan *transaction.Transaction),
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.NotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
}
}
@ -1448,6 +1452,9 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
if err != nil || event == response.MissedEventID {
return nil, response.ErrInvalidParams
}
if event == response.NotaryRequestEventID && !s.chain.P2PSigExtensionsEnabled() {
return nil, response.WrapErrorWithData(response.ErrInvalidParams, errors.New("P2PSigExtensions are disabled"))
}
// Optional filter.
var filter interface{}
if p := reqParams.Value(1); p != nil {
@ -1468,6 +1475,10 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
if p.Type != request.ExecutionFilterT {
return nil, response.ErrInvalidParams
}
case response.NotaryRequestEventID:
if p.Type != request.TxFilterT {
return nil, response.ErrInvalidParams
}
}
filter = p.Value
}
@ -1519,6 +1530,11 @@ func (s *Server) subscribeToChannel(event response.EventID) {
s.chain.SubscribeForExecutions(s.executionCh)
}
s.executionSubs++
case response.NotaryRequestEventID:
if s.notaryRequestSubs == 0 {
s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh)
}
s.notaryRequestSubs++
}
}
@ -1565,6 +1581,11 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) {
if s.executionSubs == 0 {
s.chain.UnsubscribeFromExecutions(s.executionCh)
}
case response.NotaryRequestEventID:
s.notaryRequestSubs--
if s.notaryRequestSubs == 0 {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
}
}
@ -1605,6 +1626,12 @@ chloop:
case tx := <-s.transactionCh:
resp.Event = response.TransactionEventID
resp.Payload[0] = tx
case e := <-s.notaryRequestCh:
resp.Event = response.NotaryRequestEventID
resp.Payload[0] = &response.NotaryRequestEvent{
Type: e.Type,
NotaryRequest: e.Data.(*payload.P2PNotaryRequest),
}
}
s.subsLock.RLock()
subloop:
@ -1657,6 +1684,9 @@ chloop:
s.chain.UnsubscribeFromTransactions(s.transactionCh)
s.chain.UnsubscribeFromNotifications(s.notificationCh)
s.chain.UnsubscribeFromExecutions(s.executionCh)
if s.chain.P2PSigExtensionsEnabled() {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
s.subsLock.Unlock()
drainloop:
for {
@ -1665,6 +1695,7 @@ drainloop:
case <-s.executionCh:
case <-s.notificationCh:
case <-s.transactionCh:
case <-s.notaryRequestCh:
default:
break drainloop
}
@ -1675,6 +1706,7 @@ drainloop:
close(s.transactionCh)
close(s.notificationCh)
close(s.executionCh)
close(s.notaryRequestCh)
}
func (s *Server) blockHeightFromParam(param *request.Param) (int, *response.Error) {