rpc: merge response and request under pkg/neorpc

Move result there also.
This commit is contained in:
Roman Khimov 2022-07-22 19:09:29 +03:00
parent 2e27c3d829
commit 1e0750e3cd
55 changed files with 462 additions and 466 deletions

View file

@ -13,9 +13,8 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic"
)
@ -38,7 +37,7 @@ type WSClient struct {
ws *websocket.Conn
done chan struct{}
requests chan *request.Raw
requests chan *neorpc.Request
shutdown chan struct{}
closeCalled atomic.Bool
@ -49,21 +48,21 @@ type WSClient struct {
subscriptions map[string]bool
respLock sync.RWMutex
respChannels map[uint64]chan *response.Raw
respChannels map[uint64]chan *neorpc.Response
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of block.Block, state.AppExecResult, subscriptions.NotificationEvent
// transaction.Transaction or subscriptions.NotaryRequestEvent based on Type.
type Notification struct {
Type response.EventID
Type neorpc.EventID
Value interface{}
}
// requestResponse is a combined type for request and response since we can get
// any of them here.
type requestResponse struct {
response.Raw
neorpc.Response
Method string `json:"method"`
RawParams []json.RawMessage `json:"params,omitempty"`
}
@ -103,8 +102,8 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
shutdown: make(chan struct{}),
done: make(chan struct{}),
closeCalled: *atomic.NewBool(false),
respChannels: make(map[uint64]chan *response.Raw),
requests: make(chan *request.Raw),
respChannels: make(map[uint64]chan *neorpc.Response),
requests: make(chan *neorpc.Request),
subscriptions: make(map[string]bool),
}
@ -159,20 +158,20 @@ readloop:
break readloop
}
if rr.ID == nil && rr.Method != "" {
event, err := response.GetEventIDFromString(rr.Method)
event, err := neorpc.GetEventIDFromString(rr.Method)
if err != nil {
// Bad event received.
connCloseErr = fmt.Errorf("failed to perse event ID from string %s: %w", rr.Method, err)
break readloop
}
if event != response.MissedEventID && len(rr.RawParams) != 1 {
if event != neorpc.MissedEventID && len(rr.RawParams) != 1 {
// Bad event received.
connCloseErr = fmt.Errorf("bad event received: %s / %d", event, len(rr.RawParams))
break readloop
}
var val interface{}
switch event {
case response.BlockEventID:
case neorpc.BlockEventID:
sr, err := c.StateRootInHeader()
if err != nil {
// Client is not initialized.
@ -180,22 +179,22 @@ readloop:
break readloop
}
val = block.New(sr)
case response.TransactionEventID:
case neorpc.TransactionEventID:
val = &transaction.Transaction{}
case response.NotificationEventID:
case neorpc.NotificationEventID:
val = new(subscriptions.NotificationEvent)
case response.ExecutionEventID:
case neorpc.ExecutionEventID:
val = new(state.AppExecResult)
case response.NotaryRequestEventID:
case neorpc.NotaryRequestEventID:
val = new(subscriptions.NotaryRequestEvent)
case response.MissedEventID:
case neorpc.MissedEventID:
// No value.
default:
// Bad event received.
connCloseErr = fmt.Errorf("unknown event received: %d", event)
break readloop
}
if event != response.MissedEventID {
if event != neorpc.MissedEventID {
err = json.Unmarshal(rr.RawParams[0], val)
if err != nil {
// Bad event received.
@ -215,7 +214,7 @@ readloop:
connCloseErr = fmt.Errorf("unknown response channel for response %d", id)
break readloop // Unknown response (unexpected response ID).
}
ch <- &rr.Raw
ch <- &rr.Response
} else {
// Malformed response, neither valid request, nor valid response.
connCloseErr = fmt.Errorf("malformed response")
@ -284,14 +283,14 @@ func (c *WSClient) unregisterRespChannel(id uint64) {
}
}
func (c *WSClient) getResponseChannel(id uint64) chan *response.Raw {
func (c *WSClient) getResponseChannel(id uint64) chan *neorpc.Response {
c.respLock.RLock()
defer c.respLock.RUnlock()
return c.respChannels[id]
}
func (c *WSClient) makeWsRequest(r *request.Raw) (*response.Raw, error) {
ch := make(chan *response.Raw)
func (c *WSClient) makeWsRequest(r *neorpc.Request) (*neorpc.Response, error) {
ch := make(chan *neorpc.Response)
c.respLock.Lock()
select {
case <-c.done:
@ -354,7 +353,7 @@ func (c *WSClient) performUnsubscription(id string) error {
func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) {
params := []interface{}{"block_added"}
if primary != nil {
params = append(params, request.BlockFilter{Primary: *primary})
params = append(params, neorpc.BlockFilter{Primary: *primary})
}
return c.performSubscription(params)
}
@ -365,7 +364,7 @@ func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) {
func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) {
params := []interface{}{"transaction_added"}
if sender != nil || signer != nil {
params = append(params, request.TxFilter{Sender: sender, Signer: signer})
params = append(params, neorpc.TxFilter{Sender: sender, Signer: signer})
}
return c.performSubscription(params)
}
@ -377,7 +376,7 @@ func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *uti
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) {
params := []interface{}{"notification_from_execution"}
if contract != nil || name != nil {
params = append(params, request.NotificationFilter{Contract: contract, Name: name})
params = append(params, neorpc.NotificationFilter{Contract: contract, Name: name})
}
return c.performSubscription(params)
}
@ -392,7 +391,7 @@ func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, err
if *state != "HALT" && *state != "FAULT" {
return "", errors.New("bad state parameter")
}
params = append(params, request.ExecutionFilter{State: *state})
params = append(params, neorpc.ExecutionFilter{State: *state})
}
return c.performSubscription(params)
}
@ -404,7 +403,7 @@ func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, err
func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) {
params := []interface{}{"notary_request_event"}
if sender != nil {
params = append(params, request.TxFilter{Sender: sender, Signer: mainSigner})
params = append(params, neorpc.TxFilter{Sender: sender, Signer: mainSigner})
}
return c.performSubscription(params)
}