forked from TrueCloudLab/neoneo-go
Merge pull request #2916 from nspcc-dev/not-so-local-client
Not so local RPC client
This commit is contained in:
commit
475d9de2d5
9 changed files with 396 additions and 62 deletions
82
pkg/rpcclient/local.go
Normal file
82
pkg/rpcclient/local.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package rpcclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InternalHook is a function signature that is required to create a local client
|
||||||
|
// (see NewInternal). It performs registration of local client's event channel
|
||||||
|
// and returns a request handler function.
|
||||||
|
type InternalHook func(context.Context, chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error)
|
||||||
|
|
||||||
|
// Internal is an experimental "local" client that does not connect to RPC via
|
||||||
|
// network. It's made for deeply integrated applications like NeoFS that have
|
||||||
|
// blockchain running in the same process, so use it only if you know what you're
|
||||||
|
// doing. It provides the same interface WSClient does.
|
||||||
|
type Internal struct {
|
||||||
|
WSClient
|
||||||
|
|
||||||
|
events chan neorpc.Notification
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInternal creates an instance of internal client. It accepts a method
|
||||||
|
// provided by RPC server.
|
||||||
|
func NewInternal(ctx context.Context, register InternalHook) (*Internal, error) {
|
||||||
|
c := &Internal{
|
||||||
|
WSClient: WSClient{
|
||||||
|
Client: Client{},
|
||||||
|
Notifications: make(chan Notification),
|
||||||
|
|
||||||
|
shutdown: make(chan struct{}),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
closeCalled: *atomic.NewBool(false),
|
||||||
|
subscriptions: make(map[string]notificationReceiver),
|
||||||
|
receivers: make(map[interface{}][]string),
|
||||||
|
},
|
||||||
|
events: make(chan neorpc.Notification),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := initClient(ctx, &c.WSClient.Client, "localhost:0", Options{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err // Can't really happen for internal client.
|
||||||
|
}
|
||||||
|
c.cli = nil
|
||||||
|
go c.eventLoop()
|
||||||
|
// c.ctx is inherited from ctx in fact (see initClient).
|
||||||
|
c.requestF = register(c.ctx, c.events) //nolint:contextcheck // Non-inherited new context, use function like `context.WithXXX` instead
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Internal) eventLoop() {
|
||||||
|
eventloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
break eventloop
|
||||||
|
case <-c.shutdown:
|
||||||
|
break eventloop
|
||||||
|
case ev := <-c.events:
|
||||||
|
ntf := Notification{Type: ev.Event}
|
||||||
|
if len(ev.Payload) > 0 {
|
||||||
|
ntf.Value = ev.Payload[0]
|
||||||
|
}
|
||||||
|
c.notifySubscribers(ntf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(c.done)
|
||||||
|
close(c.Notifications)
|
||||||
|
c.ctxCancel()
|
||||||
|
// ctx is cancelled, server is notified and will finish soon.
|
||||||
|
drainloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.events:
|
||||||
|
default:
|
||||||
|
break drainloop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(c.events)
|
||||||
|
}
|
18
pkg/rpcclient/local_test.go
Normal file
18
pkg/rpcclient/local_test.go
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
package rpcclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInternalClientClose(t *testing.T) {
|
||||||
|
icl, err := NewInternal(context.TODO(), func(ctx context.Context, ch chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
icl.Close()
|
||||||
|
require.NoError(t, icl.GetError())
|
||||||
|
}
|
|
@ -456,7 +456,7 @@ readloop:
|
||||||
connCloseErr = fmt.Errorf("bad event received: %s / %d", event, len(rr.RawParams))
|
connCloseErr = fmt.Errorf("bad event received: %s / %d", event, len(rr.RawParams))
|
||||||
break readloop
|
break readloop
|
||||||
}
|
}
|
||||||
var val interface{}
|
ntf := Notification{Type: event}
|
||||||
switch event {
|
switch event {
|
||||||
case neorpc.BlockEventID:
|
case neorpc.BlockEventID:
|
||||||
sr, err := c.StateRootInHeader()
|
sr, err := c.StateRootInHeader()
|
||||||
|
@ -465,15 +465,15 @@ readloop:
|
||||||
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
|
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
|
||||||
break readloop
|
break readloop
|
||||||
}
|
}
|
||||||
val = block.New(sr)
|
ntf.Value = block.New(sr)
|
||||||
case neorpc.TransactionEventID:
|
case neorpc.TransactionEventID:
|
||||||
val = &transaction.Transaction{}
|
ntf.Value = &transaction.Transaction{}
|
||||||
case neorpc.NotificationEventID:
|
case neorpc.NotificationEventID:
|
||||||
val = new(state.ContainedNotificationEvent)
|
ntf.Value = new(state.ContainedNotificationEvent)
|
||||||
case neorpc.ExecutionEventID:
|
case neorpc.ExecutionEventID:
|
||||||
val = new(state.AppExecResult)
|
ntf.Value = new(state.AppExecResult)
|
||||||
case neorpc.NotaryRequestEventID:
|
case neorpc.NotaryRequestEventID:
|
||||||
val = new(result.NotaryRequestEvent)
|
ntf.Value = new(result.NotaryRequestEvent)
|
||||||
case neorpc.MissedEventID:
|
case neorpc.MissedEventID:
|
||||||
// No value.
|
// No value.
|
||||||
default:
|
default:
|
||||||
|
@ -482,32 +482,14 @@ readloop:
|
||||||
break readloop
|
break readloop
|
||||||
}
|
}
|
||||||
if event != neorpc.MissedEventID {
|
if event != neorpc.MissedEventID {
|
||||||
err = json.Unmarshal(rr.RawParams[0], val)
|
err = json.Unmarshal(rr.RawParams[0], ntf.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Bad event received.
|
// Bad event received.
|
||||||
connCloseErr = fmt.Errorf("failed to unmarshal event of type %s from JSON: %w", event, err)
|
connCloseErr = fmt.Errorf("failed to unmarshal event of type %s from JSON: %w", event, err)
|
||||||
break readloop
|
break readloop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if event == neorpc.MissedEventID {
|
c.notifySubscribers(ntf)
|
||||||
c.subscriptionsLock.Lock()
|
|
||||||
for rcvr, ids := range c.receivers {
|
|
||||||
c.subscriptions[ids[0]].Close()
|
|
||||||
delete(c.receivers, rcvr)
|
|
||||||
}
|
|
||||||
c.subscriptionsLock.Unlock()
|
|
||||||
continue readloop
|
|
||||||
}
|
|
||||||
c.subscriptionsLock.RLock()
|
|
||||||
ntf := Notification{Type: event, Value: val}
|
|
||||||
for _, ids := range c.receivers {
|
|
||||||
for _, id := range ids {
|
|
||||||
if c.subscriptions[id].TrySend(ntf) {
|
|
||||||
break // strictly one notification per channel
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.subscriptionsLock.RUnlock()
|
|
||||||
} else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) {
|
} else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) {
|
||||||
id, err := strconv.ParseUint(string(rr.ID), 10, 64)
|
id, err := strconv.ParseUint(string(rr.ID), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -580,6 +562,27 @@ writeloop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *WSClient) notifySubscribers(ntf Notification) {
|
||||||
|
if ntf.Type == neorpc.MissedEventID {
|
||||||
|
c.subscriptionsLock.Lock()
|
||||||
|
for rcvr, ids := range c.receivers {
|
||||||
|
c.subscriptions[ids[0]].Close()
|
||||||
|
delete(c.receivers, rcvr)
|
||||||
|
}
|
||||||
|
c.subscriptionsLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.subscriptionsLock.RLock()
|
||||||
|
for _, ids := range c.receivers {
|
||||||
|
for _, id := range ids {
|
||||||
|
if c.subscriptions[id].TrySend(ntf) {
|
||||||
|
break // strictly one notification per channel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.subscriptionsLock.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *WSClient) unregisterRespChannel(id uint64) {
|
func (c *WSClient) unregisterRespChannel(id uint64) {
|
||||||
c.respLock.Lock()
|
c.respLock.Lock()
|
||||||
defer c.respLock.Unlock()
|
defer c.respLock.Unlock()
|
||||||
|
|
|
@ -2033,15 +2033,45 @@ func TestClient_Wait(t *testing.T) {
|
||||||
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
|
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWSClient_Wait(t *testing.T) {
|
func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient {
|
||||||
|
var (
|
||||||
|
c *rpcclient.WSClient
|
||||||
|
err error
|
||||||
|
icl *rpcclient.Internal
|
||||||
|
)
|
||||||
|
if local {
|
||||||
|
icl, err = rpcclient.NewInternal(context.Background(), rpcSrv.RegisterLocal)
|
||||||
|
} else {
|
||||||
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
||||||
|
c, err = rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
if local {
|
||||||
|
c = &icl.WSClient
|
||||||
|
}
|
||||||
|
require.NoError(t, c.Init())
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWSAndLocal(t *testing.T, test func(*testing.T, bool)) {
|
||||||
|
t.Run("ws", func(t *testing.T) {
|
||||||
|
test(t, false)
|
||||||
|
})
|
||||||
|
t.Run("local", func(t *testing.T) {
|
||||||
|
test(t, true)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubClientWait(t *testing.T) {
|
||||||
|
runWSAndLocal(t, testSubClientWait)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSubClientWait(t *testing.T, local bool) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer rpcSrv.Shutdown()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
c := mkSubsClient(t, rpcSrv, httpSrv, local)
|
||||||
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
acc, err := wallet.NewAccount()
|
acc, err := wallet.NewAccount()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
act, err := actor.New(c, []actor.SignerAccount{
|
act, err := actor.New(c, []actor.SignerAccount{
|
||||||
|
@ -2135,15 +2165,16 @@ func TestWSClient_Wait(t *testing.T) {
|
||||||
require.True(t, faultedChecked, "FAULTed transaction wasn't checked")
|
require.True(t, faultedChecked, "FAULTed transaction wasn't checked")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWSClient_WaitWithLateSubscription(t *testing.T) {
|
func TestSubClientWaitWithLateSubscription(t *testing.T) {
|
||||||
|
runWSAndLocal(t, testSubClientWaitWithLateSubscription)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSubClientWaitWithLateSubscription(t *testing.T, local bool) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer rpcSrv.Shutdown()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
c := mkSubsClient(t, rpcSrv, httpSrv, local)
|
||||||
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
acc, err := wallet.NewAccount()
|
acc, err := wallet.NewAccount()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
act, err := actor.New(c, []actor.SignerAccount{
|
act, err := actor.New(c, []actor.SignerAccount{
|
||||||
|
@ -2182,15 +2213,16 @@ func TestWSClientHandshakeError(t *testing.T) {
|
||||||
require.ErrorContains(t, err, "websocket users limit reached")
|
require.ErrorContains(t, err, "websocket users limit reached")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWSClient_WaitWithMissedEvent(t *testing.T) {
|
func TestSubClientWaitWithMissedEvent(t *testing.T) {
|
||||||
|
runWSAndLocal(t, testSubClientWaitWithMissedEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSubClientWaitWithMissedEvent(t *testing.T, local bool) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer rpcSrv.Shutdown()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
c := mkSubsClient(t, rpcSrv, httpSrv, local)
|
||||||
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
acc, err := wallet.NewAccount()
|
acc, err := wallet.NewAccount()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
act, err := actor.New(c, []actor.SignerAccount{
|
act, err := actor.New(c, []actor.SignerAccount{
|
||||||
|
@ -2233,18 +2265,19 @@ func TestWSClient_WaitWithMissedEvent(t *testing.T) {
|
||||||
// Accept the next block, but subscriber will get no events because it's overflown.
|
// Accept the next block, but subscriber will get no events because it's overflown.
|
||||||
require.NoError(t, chain.AddBlock(b1))
|
require.NoError(t, chain.AddBlock(b1))
|
||||||
|
|
||||||
overEvent, err := json.Marshal(neorpc.Notification{
|
overNotification := neorpc.Notification{
|
||||||
JSONRPC: neorpc.JSONRPCVersion,
|
JSONRPC: neorpc.JSONRPCVersion,
|
||||||
Event: neorpc.MissedEventID,
|
Event: neorpc.MissedEventID,
|
||||||
Payload: make([]interface{}, 0),
|
Payload: make([]interface{}, 0),
|
||||||
})
|
}
|
||||||
|
overEvent, err := json.Marshal(overNotification)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent)
|
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
rpcSrv.subsLock.Lock()
|
rpcSrv.subsLock.Lock()
|
||||||
// Deliver overflow message -> triggers subscriber to retry with polling waiter.
|
// Deliver overflow message -> triggers subscriber to retry with polling waiter.
|
||||||
for s := range rpcSrv.subscribers {
|
for s := range rpcSrv.subscribers {
|
||||||
s.writer <- overflowMsg
|
s.writer <- intEvent{overflowMsg, &overNotification}
|
||||||
}
|
}
|
||||||
rpcSrv.subsLock.Unlock()
|
rpcSrv.subsLock.Unlock()
|
||||||
|
|
||||||
|
@ -2271,10 +2304,7 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) {
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer rpcSrv.Shutdown()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
c := mkSubsClient(t, rpcSrv, httpSrv, false)
|
||||||
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
blocks := getTestBlocks(t)
|
blocks := getTestBlocks(t)
|
||||||
bCount := uint32(0)
|
bCount := uint32(0)
|
||||||
|
|
||||||
|
@ -2289,8 +2319,11 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) {
|
||||||
return b1, primary, sender, ntfName, st
|
return b1, primary, sender, ntfName, st
|
||||||
}
|
}
|
||||||
checkDeprecated := func(t *testing.T, filtered bool) {
|
checkDeprecated := func(t *testing.T, filtered bool) {
|
||||||
|
var (
|
||||||
|
bID, txID, ntfID, aerID string
|
||||||
|
err error
|
||||||
|
)
|
||||||
b, primary, sender, ntfName, st := getData(t)
|
b, primary, sender, ntfName, st := getData(t)
|
||||||
var bID, txID, ntfID, aerID string
|
|
||||||
if filtered {
|
if filtered {
|
||||||
bID, err = c.SubscribeForNewBlocks(&primary) //nolint:staticcheck // SA1019: c.SubscribeForNewBlocks is deprecated
|
bID, err = c.SubscribeForNewBlocks(&primary) //nolint:staticcheck // SA1019: c.SubscribeForNewBlocks is deprecated
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -2381,6 +2414,7 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) {
|
||||||
txFlt *neorpc.TxFilter
|
txFlt *neorpc.TxFilter
|
||||||
ntfFlt *neorpc.NotificationFilter
|
ntfFlt *neorpc.NotificationFilter
|
||||||
aerFlt *neorpc.ExecutionFilter
|
aerFlt *neorpc.ExecutionFilter
|
||||||
|
err error
|
||||||
)
|
)
|
||||||
if filtered {
|
if filtered {
|
||||||
bFlt = &neorpc.BlockFilter{Primary: &primary}
|
bFlt = &neorpc.BlockFilter{Primary: &primary}
|
||||||
|
|
57
pkg/services/rpcsrv/local_test.go
Normal file
57
pkg/services/rpcsrv/local_test.go
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
package rpcsrv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math/big"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/internal/testchain"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLocalClient(t *testing.T) {
|
||||||
|
_, rpcSrv, _ := initClearServerWithCustomConfig(t, func(cfg *config.Config) {
|
||||||
|
// No addresses configured -> RPC server listens nothing (but it
|
||||||
|
// has MaxGasInvoke, sessions and other stuff).
|
||||||
|
cfg.ApplicationConfiguration.RPC.BasicService.Enabled = true
|
||||||
|
cfg.ApplicationConfiguration.RPC.BasicService.Address = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.BasicService.Address is deprecated
|
||||||
|
cfg.ApplicationConfiguration.RPC.BasicService.Port = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.BasicService.Port is deprecated
|
||||||
|
cfg.ApplicationConfiguration.RPC.BasicService.Addresses = nil
|
||||||
|
cfg.ApplicationConfiguration.RPC.TLSConfig.Address = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.TLSConfig.Address is deprecated
|
||||||
|
cfg.ApplicationConfiguration.RPC.TLSConfig.Port = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.TLSConfig.Port is deprecated
|
||||||
|
cfg.ApplicationConfiguration.RPC.TLSConfig.Addresses = nil
|
||||||
|
})
|
||||||
|
// RPC server listens nothing (not exposed in any way), but it works for internal clients.
|
||||||
|
c, err := rpcclient.NewInternal(context.TODO(), rpcSrv.RegisterLocal)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, c.Init())
|
||||||
|
|
||||||
|
// Invokers can use this client.
|
||||||
|
gasReader := gas.NewReader(invoker.New(c, nil))
|
||||||
|
d, err := gasReader.Decimals()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 8, d)
|
||||||
|
|
||||||
|
// Actors can use it as well
|
||||||
|
priv := testchain.PrivateKeyByID(0)
|
||||||
|
acc := wallet.NewAccountFromPrivateKey(priv)
|
||||||
|
addr := priv.PublicKey().GetScriptHash()
|
||||||
|
|
||||||
|
act, err := actor.NewSimple(c, acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
gasprom := gas.New(act)
|
||||||
|
txHash, _, err := gasprom.Transfer(addr, util.Uint160{}, big.NewInt(1000), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// No new blocks are produced here, but the tx is OK and is in the mempool.
|
||||||
|
txes, err := c.GetRawMemPool()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, []util.Uint256{txHash}, txes)
|
||||||
|
// Subscriptions are checked by other tests.
|
||||||
|
}
|
|
@ -1,12 +1,29 @@
|
||||||
package params
|
package params
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// Params represents the JSON-RPC params.
|
// Params represents the JSON-RPC params.
|
||||||
Params []Param
|
Params []Param
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// FromAny allows to create Params for a slice of abstract values (by
|
||||||
|
// JSON-marshaling them).
|
||||||
|
func FromAny(arr []interface{}) (Params, error) {
|
||||||
|
var res Params
|
||||||
|
for i := range arr {
|
||||||
|
b, err := json.Marshal(arr[i])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("wrong parameter %d: %w", i, err)
|
||||||
|
}
|
||||||
|
res = append(res, Param{RawMessage: json.RawMessage(b)})
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Value returns the param struct for the given
|
// Value returns the param struct for the given
|
||||||
// index if it exists.
|
// index if it exists.
|
||||||
func (p Params) Value(index int) *Param {
|
func (p Params) Value(index int) *Param {
|
||||||
|
|
31
pkg/services/rpcsrv/params/params_test.go
Normal file
31
pkg/services/rpcsrv/params/params_test.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package params
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParamsFromAny(t *testing.T) {
|
||||||
|
str := "jajaja"
|
||||||
|
|
||||||
|
ps, err := FromAny([]interface{}{str, smartcontract.Parameter{Type: smartcontract.StringType, Value: str}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, len(ps))
|
||||||
|
|
||||||
|
resStr, err := ps[0].GetString()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, resStr, str)
|
||||||
|
|
||||||
|
resFP, err := ps[1].GetFuncParam()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, resFP.Type, smartcontract.StringType)
|
||||||
|
resStr, err = resFP.Value.GetString()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, resStr, str)
|
||||||
|
|
||||||
|
// Invalid item.
|
||||||
|
_, err = FromAny([]interface{}{smartcontract.Parameter{Type: smartcontract.IntegerType, Value: str}})
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
|
@ -469,8 +469,8 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resChan := make(chan abstractResult) // response.abstract or response.abstractBatch
|
resChan := make(chan abstractResult) // response.abstract or response.abstractBatch
|
||||||
subChan := make(chan *websocket.PreparedMessage, notificationBufSize)
|
subChan := make(chan intEvent, notificationBufSize)
|
||||||
subscr := &subscriber{writer: subChan, ws: ws}
|
subscr := &subscriber{writer: subChan}
|
||||||
s.subsLock.Lock()
|
s.subsLock.Lock()
|
||||||
s.subscribers[subscr] = true
|
s.subscribers[subscr] = true
|
||||||
s.subsLock.Unlock()
|
s.subsLock.Unlock()
|
||||||
|
@ -505,6 +505,19 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
||||||
s.writeHTTPServerResponse(req, w, resp)
|
s.writeHTTPServerResponse(req, w, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterLocal performs local client registration.
|
||||||
|
func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) {
|
||||||
|
subChan := make(chan intEvent, notificationBufSize)
|
||||||
|
subscr := &subscriber{writer: subChan}
|
||||||
|
s.subsLock.Lock()
|
||||||
|
s.subscribers[subscr] = true
|
||||||
|
s.subsLock.Unlock()
|
||||||
|
go s.handleLocalNotifications(ctx, events, subChan, subscr)
|
||||||
|
return func(req *neorpc.Request) (*neorpc.Response, error) {
|
||||||
|
return s.handleInternal(req, subscr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractResult {
|
func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractResult {
|
||||||
if req.In != nil {
|
if req.In != nil {
|
||||||
req.In.Method = escapeForLog(req.In.Method) // No valid method name will be changed by it.
|
req.In.Method = escapeForLog(req.In.Method) // No valid method name will be changed by it.
|
||||||
|
@ -518,6 +531,51 @@ func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractRes
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleInternal is an experimental interface to handle client requests directly.
|
||||||
|
func (s *Server) handleInternal(req *neorpc.Request, sub *subscriber) (*neorpc.Response, error) {
|
||||||
|
var (
|
||||||
|
res interface{}
|
||||||
|
rpcRes = &neorpc.Response{
|
||||||
|
HeaderAndError: neorpc.HeaderAndError{
|
||||||
|
Header: neorpc.Header{
|
||||||
|
JSONRPC: req.JSONRPC,
|
||||||
|
ID: json.RawMessage(strconv.FormatUint(req.ID, 10)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
reqParams, err := params.FromAny(req.Params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Debug("processing local rpc request",
|
||||||
|
zap.String("method", req.Method),
|
||||||
|
zap.Stringer("params", reqParams))
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer func() { addReqTimeMetric(req.Method, time.Since(start)) }()
|
||||||
|
|
||||||
|
rpcRes.Error = neorpc.NewMethodNotFoundError(fmt.Sprintf("method %q not supported", req.Method))
|
||||||
|
handler, ok := rpcHandlers[req.Method]
|
||||||
|
if ok {
|
||||||
|
res, rpcRes.Error = handler(s, reqParams)
|
||||||
|
} else if sub != nil {
|
||||||
|
handler, ok := rpcWsHandlers[req.Method]
|
||||||
|
if ok {
|
||||||
|
res, rpcRes.Error = handler(s, reqParams, sub)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if res != nil {
|
||||||
|
b, err := json.Marshal(res)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("response can't be JSONized: %w", err)
|
||||||
|
}
|
||||||
|
rpcRes.Result = json.RawMessage(b)
|
||||||
|
}
|
||||||
|
return rpcRes, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleIn(req *params.In, sub *subscriber) abstract {
|
func (s *Server) handleIn(req *params.In, sub *subscriber) abstract {
|
||||||
var res interface{}
|
var res interface{}
|
||||||
var resErr *neorpc.Error
|
var resErr *neorpc.Error
|
||||||
|
@ -547,7 +605,31 @@ func (s *Server) handleIn(req *params.In, sub *subscriber) abstract {
|
||||||
return s.packResponse(req, res, resErr)
|
return s.packResponse(req, res, resErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan *websocket.PreparedMessage) {
|
func (s *Server) handleLocalNotifications(ctx context.Context, events chan<- neorpc.Notification, subChan <-chan intEvent, subscr *subscriber) {
|
||||||
|
eventloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.shutdown:
|
||||||
|
break eventloop
|
||||||
|
case <-ctx.Done():
|
||||||
|
break eventloop
|
||||||
|
case ev := <-subChan:
|
||||||
|
events <- *ev.ntf // Make a copy.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(events)
|
||||||
|
s.dropSubscriber(subscr)
|
||||||
|
drainloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-subChan:
|
||||||
|
default:
|
||||||
|
break drainloop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan intEvent) {
|
||||||
pingTicker := time.NewTicker(wsPingPeriod)
|
pingTicker := time.NewTicker(wsPingPeriod)
|
||||||
eventloop:
|
eventloop:
|
||||||
for {
|
for {
|
||||||
|
@ -561,7 +643,7 @@ eventloop:
|
||||||
if err := ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil {
|
if err := ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil {
|
||||||
break eventloop
|
break eventloop
|
||||||
}
|
}
|
||||||
if err := ws.WritePreparedMessage(event); err != nil {
|
if err := ws.WritePreparedMessage(event.msg); err != nil {
|
||||||
break eventloop
|
break eventloop
|
||||||
}
|
}
|
||||||
case res, ok := <-resChan:
|
case res, ok := <-resChan:
|
||||||
|
@ -621,7 +703,12 @@ requestloop:
|
||||||
case resChan <- res:
|
case resChan <- res:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.dropSubscriber(subscr)
|
||||||
|
close(resChan)
|
||||||
|
ws.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) dropSubscriber(subscr *subscriber) {
|
||||||
s.subsLock.Lock()
|
s.subsLock.Lock()
|
||||||
delete(s.subscribers, subscr)
|
delete(s.subscribers, subscr)
|
||||||
s.subsLock.Unlock()
|
s.subsLock.Unlock()
|
||||||
|
@ -632,8 +719,6 @@ requestloop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.subsCounterLock.Unlock()
|
s.subsCounterLock.Unlock()
|
||||||
close(resChan)
|
|
||||||
ws.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) getBestBlockHash(_ params.Params) (interface{}, *neorpc.Error) {
|
func (s *Server) getBestBlockHash(_ params.Params) (interface{}, *neorpc.Error) {
|
||||||
|
@ -2581,11 +2666,12 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleSubEvents() {
|
func (s *Server) handleSubEvents() {
|
||||||
b, err := json.Marshal(neorpc.Notification{
|
var overflowEvent = neorpc.Notification{
|
||||||
JSONRPC: neorpc.JSONRPCVersion,
|
JSONRPC: neorpc.JSONRPCVersion,
|
||||||
Event: neorpc.MissedEventID,
|
Event: neorpc.MissedEventID,
|
||||||
Payload: make([]interface{}, 0),
|
Payload: make([]interface{}, 0),
|
||||||
})
|
}
|
||||||
|
b, err := json.Marshal(overflowEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("fatal: failed to marshal overflow event", zap.Error(err))
|
s.log.Error("fatal: failed to marshal overflow event", zap.Error(err))
|
||||||
return
|
return
|
||||||
|
@ -2649,12 +2735,12 @@ chloop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case sub.writer <- msg:
|
case sub.writer <- intEvent{msg, &resp}:
|
||||||
default:
|
default:
|
||||||
sub.overflown.Store(true)
|
sub.overflown.Store(true)
|
||||||
// MissedEvent is to be delivered eventually.
|
// MissedEvent is to be delivered eventually.
|
||||||
go func(sub *subscriber) {
|
go func(sub *subscriber) {
|
||||||
sub.writer <- overflowMsg
|
sub.writer <- intEvent{overflowMsg, &overflowEvent}
|
||||||
sub.overflown.Store(false)
|
sub.overflown.Store(false)
|
||||||
}(sub)
|
}(sub)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,10 +7,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
// intEvent is an internal event that has both a proper structure and
|
||||||
|
// a websocket-ready message. It's used to serve websocket-based clients
|
||||||
|
// as well as internal ones.
|
||||||
|
intEvent struct {
|
||||||
|
msg *websocket.PreparedMessage
|
||||||
|
ntf *neorpc.Notification
|
||||||
|
}
|
||||||
// subscriber is an event subscriber.
|
// subscriber is an event subscriber.
|
||||||
subscriber struct {
|
subscriber struct {
|
||||||
writer chan<- *websocket.PreparedMessage
|
writer chan<- intEvent
|
||||||
ws *websocket.Conn
|
|
||||||
overflown atomic.Bool
|
overflown atomic.Bool
|
||||||
// These work like slots as there is not a lot of them (it's
|
// These work like slots as there is not a lot of them (it's
|
||||||
// cheaper doing it this way rather than creating a map),
|
// cheaper doing it this way rather than creating a map),
|
||||||
|
|
Loading…
Add table
Reference in a new issue