Merge pull request #2749 from nspcc-dev/rpc-waiter

rpc: implement transaction awaiting
This commit is contained in:
Roman Khimov 2022-10-24 13:08:06 +07:00 committed by GitHub
commit 0551ddff4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1588 additions and 151 deletions

View file

@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server.
Currently supported events:
* new block added
Contents: block. Filters: primary ID.
Contents: block. Filters: primary ID, since/till block indexes.
* new transaction in the block
Contents: transaction. Filters: sender and signer.
@ -19,7 +19,7 @@ Currently supported events:
Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name.
* transaction executed
Contents: application execution result. Filters: VM state.
Contents: application execution result. Filters: VM state, script container hash.
* new/removed P2P notary request (if `P2PSigExtensions` are enabled)
Contents: P2P notary request. Filters: request sender and main tx signer.
@ -57,7 +57,10 @@ omitted if empty).
Recognized stream names:
* `block_added`
Filter: `primary` as an integer with primary (speaker) node index from
ConsensusData.
ConsensusData and/or `since` field as an integer value with block
index starting from which new block notifications will be received and/or
`till` field as an integer values containing block index till which new
block notifications will be received.
* `transaction_added`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for transaction's `Sender` and/or `signer` in the same
@ -68,7 +71,8 @@ Recognized stream names:
notification name.
* `transaction_executed`
Filter: `state` field containing `HALT` or `FAULT` string for successful
and failed executions respectively.
and failed executions respectively and/or `container` field containing
script container hash.
* `notary_request_event`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for notary request's `Sender` and/or `signer` in the same

View file

@ -232,3 +232,20 @@ func (r *Invoke) UnmarshalJSON(data []byte) error {
r.Diagnostics = aux.Diagnostics
return nil
}
// AppExecToInvocation converts state.AppExecResult to result.Invoke and can be used
// as a wrapper for actor.Wait. The result of AppExecToInvocation doesn't have all fields
// properly filled, it's limited by State, GasConsumed, Stack, FaultException and Notifications.
// The result of AppExecToInvocation can be passed to unwrap package helpers.
func AppExecToInvocation(aer *state.AppExecResult, err error) (*Invoke, error) {
if err != nil {
return nil, err
}
return &Invoke{
State: aer.VMState.String(),
GasConsumed: aer.GasConsumed,
Stack: aer.Stack,
FaultException: aer.FaultException,
Notifications: aer.Events,
}, nil
}

View file

@ -3,13 +3,17 @@ package result
import (
"encoding/base64"
"encoding/json"
"errors"
"math/big"
"testing"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/stretchr/testify/require"
)
@ -49,3 +53,39 @@ func TestInvoke_MarshalJSON(t *testing.T) {
require.NoError(t, json.Unmarshal(data, actual))
require.Equal(t, result, actual)
}
func TestAppExecToInvocation(t *testing.T) {
// With error.
someErr := errors.New("some err")
_, err := AppExecToInvocation(nil, someErr)
require.ErrorIs(t, err, someErr)
// Good.
h := util.Uint256{1, 2, 3}
ex := state.Execution{
Trigger: trigger.Application,
VMState: vmstate.Fault,
GasConsumed: 123,
Stack: []stackitem.Item{stackitem.NewBigInteger(big.NewInt(123))},
Events: []state.NotificationEvent{{
ScriptHash: util.Uint160{3, 2, 1},
Name: "Notification",
Item: stackitem.NewArray([]stackitem.Item{stackitem.Null{}}),
}},
FaultException: "some fault exception",
}
inv, err := AppExecToInvocation(&state.AppExecResult{
Container: h,
Execution: ex,
}, nil)
require.NoError(t, err)
require.Equal(t, ex.VMState.String(), inv.State)
require.Equal(t, ex.GasConsumed, inv.GasConsumed)
require.Nil(t, inv.Script)
require.Equal(t, ex.Stack, inv.Stack)
require.Equal(t, ex.FaultException, inv.FaultException)
require.Equal(t, ex.Events, inv.Notifications)
require.Nil(t, inv.Transaction)
require.Nil(t, inv.Diagnostics)
require.Equal(t, uuid.UUID{}, inv.Session)
}

View file

@ -0,0 +1,88 @@
package rpcevent
import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
)
type (
// Comparator is an interface required from notification event filter to be able to
// filter notifications.
Comparator interface {
EventID() neorpc.EventID
Filter() interface{}
}
// Container is an interface required from notification event to be able to
// pass filter.
Container interface {
EventID() neorpc.EventID
EventPayload() interface{}
}
)
// Matches filters our given Container against Comparator filter.
func Matches(f Comparator, r Container) bool {
expectedEvent := f.EventID()
filter := f.Filter()
if r.EventID() != expectedEvent {
return false
}
if filter == nil {
return true
}
switch f.EventID() {
case neorpc.BlockEventID:
filt := filter.(neorpc.BlockFilter)
b := r.EventPayload().(*block.Block)
primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
sinceOk := filt.Since == nil || *filt.Since <= b.Index
tillOk := filt.Till == nil || b.Index <= *filt.Till
return primaryOk && sinceOk && tillOk
case neorpc.TransactionEventID:
filt := filter.(neorpc.TxFilter)
tx := r.EventPayload().(*transaction.Transaction)
senderOK := filt.Sender == nil || tx.Sender().Equals(*filt.Sender)
signerOK := true
if filt.Signer != nil {
signerOK = false
for i := range tx.Signers {
if tx.Signers[i].Account.Equals(*filt.Signer) {
signerOK = true
break
}
}
}
return senderOK && signerOK
case neorpc.NotificationEventID:
filt := filter.(neorpc.NotificationFilter)
notification := r.EventPayload().(*state.ContainedNotificationEvent)
hashOk := filt.Contract == nil || notification.ScriptHash.Equals(*filt.Contract)
nameOk := filt.Name == nil || notification.Name == *filt.Name
return hashOk && nameOk
case neorpc.ExecutionEventID:
filt := filter.(neorpc.ExecutionFilter)
applog := r.EventPayload().(*state.AppExecResult)
stateOK := filt.State == nil || applog.VMState.String() == *filt.State
containerOK := filt.Container == nil || applog.Container.Equals(*filt.Container)
return stateOK && containerOK
case neorpc.NotaryRequestEventID:
filt := filter.(neorpc.TxFilter)
req := r.EventPayload().(*result.NotaryRequestEvent)
senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender
signerOK := true
if filt.Signer != nil {
signerOK = false
for _, signer := range req.NotaryRequest.MainTransaction.Signers {
if signer.Account.Equals(*filt.Signer) {
signerOK = true
break
}
}
}
return senderOk && signerOK
}
return false
}

View file

@ -0,0 +1,286 @@
package rpcevent
import (
"testing"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/stretchr/testify/require"
)
type (
testComparator struct {
id neorpc.EventID
filter interface{}
}
testContainer struct {
id neorpc.EventID
pld interface{}
}
)
func (c testComparator) EventID() neorpc.EventID {
return c.id
}
func (c testComparator) Filter() interface{} {
return c.filter
}
func (c testContainer) EventID() neorpc.EventID {
return c.id
}
func (c testContainer) EventPayload() interface{} {
return c.pld
}
func TestMatches(t *testing.T) {
primary := 1
badPrimary := 2
index := uint32(5)
badHigherIndex := uint32(6)
badLowerIndex := index - 1
sender := util.Uint160{1, 2, 3}
signer := util.Uint160{4, 5, 6}
contract := util.Uint160{7, 8, 9}
badUint160 := util.Uint160{9, 9, 9}
cnt := util.Uint256{1, 2, 3}
badUint256 := util.Uint256{9, 9, 9}
name := "ntf name"
badName := "bad name"
bContainer := testContainer{
id: neorpc.BlockEventID,
pld: &block.Block{
Header: block.Header{PrimaryIndex: byte(primary), Index: index},
},
}
st := vmstate.Halt
goodState := st.String()
badState := "FAULT"
txContainer := testContainer{
id: neorpc.TransactionEventID,
pld: &transaction.Transaction{Signers: []transaction.Signer{{Account: sender}, {Account: signer}}},
}
ntfContainer := testContainer{
id: neorpc.NotificationEventID,
pld: &state.ContainedNotificationEvent{NotificationEvent: state.NotificationEvent{ScriptHash: contract, Name: name}},
}
exContainer := testContainer{
id: neorpc.ExecutionEventID,
pld: &state.AppExecResult{Container: cnt, Execution: state.Execution{VMState: st}},
}
ntrContainer := testContainer{
id: neorpc.NotaryRequestEventID,
pld: &result.NotaryRequestEvent{
NotaryRequest: &payload.P2PNotaryRequest{
MainTransaction: &transaction.Transaction{Signers: []transaction.Signer{{Account: signer}}},
FallbackTransaction: &transaction.Transaction{Signers: []transaction.Signer{{Account: util.Uint160{}}, {Account: sender}}},
},
},
}
missedContainer := testContainer{
id: neorpc.MissedEventID,
}
var testCases = []struct {
name string
comparator testComparator
container testContainer
expected bool
}{
{
name: "ID mismatch",
comparator: testComparator{id: neorpc.TransactionEventID},
container: bContainer,
expected: false,
},
{
name: "missed event",
comparator: testComparator{id: neorpc.BlockEventID},
container: missedContainer,
expected: false,
},
{
name: "block, no filter",
comparator: testComparator{id: neorpc.BlockEventID},
container: bContainer,
expected: true,
},
{
name: "block, primary mismatch",
comparator: testComparator{
id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Primary: &badPrimary},
},
container: bContainer,
expected: false,
},
{
name: "block, since mismatch",
comparator: testComparator{
id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Since: &badHigherIndex},
},
container: bContainer,
expected: false,
},
{
name: "block, till mismatch",
comparator: testComparator{
id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Till: &badLowerIndex},
},
container: bContainer,
expected: false,
},
{
name: "block, filter match",
comparator: testComparator{
id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index},
},
container: bContainer,
expected: true,
},
{
name: "transaction, no filter",
comparator: testComparator{id: neorpc.TransactionEventID},
container: txContainer,
expected: true,
},
{
name: "transaction, sender mismatch",
comparator: testComparator{
id: neorpc.TransactionEventID,
filter: neorpc.TxFilter{Sender: &badUint160},
},
container: txContainer,
expected: false,
},
{
name: "transaction, signer mismatch",
comparator: testComparator{
id: neorpc.TransactionEventID,
filter: neorpc.TxFilter{Signer: &badUint160},
},
container: txContainer,
expected: false,
},
{
name: "transaction, filter match",
comparator: testComparator{
id: neorpc.TransactionEventID,
filter: neorpc.TxFilter{Sender: &sender, Signer: &signer},
},
container: txContainer,
expected: true,
},
{
name: "notification, no filter",
comparator: testComparator{id: neorpc.NotificationEventID},
container: ntfContainer,
expected: true,
},
{
name: "notification, contract mismatch",
comparator: testComparator{
id: neorpc.NotificationEventID,
filter: neorpc.NotificationFilter{Contract: &badUint160},
},
container: ntfContainer,
expected: false,
},
{
name: "notification, name mismatch",
comparator: testComparator{
id: neorpc.NotificationEventID,
filter: neorpc.NotificationFilter{Name: &badName},
},
container: ntfContainer,
expected: false,
},
{
name: "notification, filter match",
comparator: testComparator{
id: neorpc.NotificationEventID,
filter: neorpc.NotificationFilter{Name: &name, Contract: &contract},
},
container: ntfContainer,
expected: true,
},
{
name: "execution, no filter",
comparator: testComparator{id: neorpc.ExecutionEventID},
container: exContainer,
expected: true,
},
{
name: "execution, state mismatch",
comparator: testComparator{
id: neorpc.ExecutionEventID,
filter: neorpc.ExecutionFilter{State: &badState},
},
container: exContainer,
expected: false,
},
{
name: "execution, container mismatch",
comparator: testComparator{
id: neorpc.ExecutionEventID,
filter: neorpc.ExecutionFilter{Container: &badUint256},
},
container: exContainer,
expected: false,
},
{
name: "execution, filter mismatch",
comparator: testComparator{
id: neorpc.ExecutionEventID,
filter: neorpc.ExecutionFilter{State: &goodState, Container: &cnt},
},
container: exContainer,
expected: true,
},
{
name: "notary request, no filter",
comparator: testComparator{id: neorpc.NotaryRequestEventID},
container: ntrContainer,
expected: true,
},
{
name: "notary request, sender mismatch",
comparator: testComparator{
id: neorpc.NotaryRequestEventID,
filter: neorpc.TxFilter{Sender: &badUint160},
},
container: ntrContainer,
expected: false,
},
{
name: "notary request, signer mismatch",
comparator: testComparator{
id: neorpc.NotaryRequestEventID,
filter: neorpc.TxFilter{Signer: &badUint160},
},
container: ntrContainer,
expected: false,
},
{
name: "notary request, filter match",
comparator: testComparator{
id: neorpc.NotaryRequestEventID,
filter: neorpc.TxFilter{Sender: &sender, Signer: &signer},
},
container: ntrContainer,
expected: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, Matches(tc.comparator, tc.container))
})
}
}

View file

@ -71,10 +71,13 @@ type (
Payload []interface{} `json:"params"`
}
// BlockFilter is a wrapper structure for the block event filter. The only
// allowed filter is primary index.
// BlockFilter is a wrapper structure for the block event filter. It allows
// to filter blocks by primary index and by block index (allowing blocks since
// the specified index).
BlockFilter struct {
Primary int `json:"primary"`
Primary *int `json:"primary,omitempty"`
Since *uint32 `json:"since,omitempty"`
Till *uint32 `json:"till,omitempty"`
}
// TxFilter is a wrapper structure for the transaction event filter. It
// allows to filter transactions by senders and signers.
@ -93,7 +96,8 @@ type (
// events. It allows to choose failing or successful transactions based
// on their VM state.
ExecutionFilter struct {
State string `json:"state"`
State *string `json:"state,omitempty"`
Container *util.Uint256 `json:"container,omitempty"`
}
// SignerWithWitness represents transaction's signer with the corresponding witness.
SignerWithWitness struct {
@ -155,3 +159,14 @@ func (s *SignerWithWitness) UnmarshalJSON(data []byte) error {
}
return nil
}
// EventID implements EventContainer interface and returns notification ID.
func (n *Notification) EventID() EventID {
return n.Event
}
// EventPayload implements EventContainer interface and returns notification
// object.
func (n *Notification) EventPayload() interface{} {
return n.Payload[0]
}

View file

@ -53,8 +53,22 @@ type SignerAccount struct {
// action to be performed, "Make" prefix is used for methods that create
// transactions in various ways, while "Send" prefix is used by methods that
// directly transmit created transactions to the RPC server.
//
// Actor also provides a Waiter interface to wait until transaction will be
// accepted to the chain. Depending on the underlying RPCActor functionality,
// transaction awaiting can be performed via web-socket using RPC notifications
// subsystem with EventWaiter, via regular RPC requests using a poll-based
// algorithm with PollingWaiter or can not be performed if RPCActor doesn't
// implement none of RPCEventWaiter and RPCPollingWaiter interfaces with
// NullWaiter. ErrAwaitingNotSupported will be returned on attempt to await the
// transaction in the latter case. Waiter uses context of the underlying RPCActor
// and interrupts transaction awaiting process if the context is done.
// ErrContextDone wrapped with the context's error will be returned in this case.
// Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance
// and ErrTxNotAccepted is returned if transaction wasn't accepted by this moment.
type Actor struct {
invoker.Invoker
Waiter
client RPCActor
opts Options
@ -108,6 +122,7 @@ func New(ra RPCActor, signers []SignerAccount) (*Actor, error) {
}
return &Actor{
Invoker: *inv,
Waiter: newWaiter(ra, version),
client: ra,
opts: NewDefaultOptions(),
signers: signers,

View file

@ -1,6 +1,7 @@
package actor
import (
"context"
"errors"
"testing"
@ -9,19 +10,23 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
type RPCClient struct {
err error
invRes *result.Invoke
netFee int64
bCount uint32
bCount atomic.Uint32
version *result.Version
hash util.Uint256
appLog *result.ApplicationLog
context context.Context
}
func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) {
@ -37,7 +42,7 @@ func (r *RPCClient) CalculateNetworkFee(tx *transaction.Transaction) (int64, err
return r.netFee, r.err
}
func (r *RPCClient) GetBlockCount() (uint32, error) {
return r.bCount, r.err
return r.bCount.Load(), r.err
}
func (r *RPCClient) GetVersion() (*result.Version, error) {
verCopy := *r.version
@ -52,6 +57,19 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) {
func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) {
return nil, nil // Just a stub, unused by actor.
}
func (r *RPCClient) Context() context.Context {
if r.context == nil {
return context.Background()
}
return r.context
}
func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
if r.appLog != nil {
return r.appLog, nil
}
return nil, errors.New("not found")
}
func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) {
client := &RPCClient{
version: &result.Version{
@ -164,7 +182,7 @@ func TestSimpleWrappers(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(42), nf)
client.bCount = 100500
client.bCount.Store(100500)
bc, err := a.GetBlockCount()
require.NoError(t, err)
require.Equal(t, uint32(100500), bc)

View file

@ -0,0 +1,19 @@
package actor_test
import (
"testing"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
)
func TestRPCActorRPCClientCompat(t *testing.T) {
_ = actor.RPCActor(&rpcclient.WSClient{})
_ = actor.RPCActor(&rpcclient.Client{})
}
func TestRPCWaiterRPCClientCompat(t *testing.T) {
_ = actor.RPCPollingWaiter(&rpcclient.Client{})
_ = actor.RPCPollingWaiter(&rpcclient.WSClient{})
_ = actor.RPCEventWaiter(&rpcclient.WSClient{})
}

View file

@ -20,7 +20,7 @@ func TestCalculateValidUntilBlock(t *testing.T) {
require.Error(t, err)
client.err = nil
client.bCount = 42
client.bCount.Store(42)
vub, err := a.CalculateValidUntilBlock()
require.NoError(t, err)
require.Equal(t, uint32(42+7+1), vub)
@ -37,7 +37,7 @@ func TestCalculateValidUntilBlock(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint32(42+4+1), vub)
client.bCount = 101
client.bCount.Store(101)
vub, err = a.CalculateValidUntilBlock()
require.NoError(t, err)
require.Equal(t, uint32(101+10+1), vub)
@ -64,7 +64,7 @@ func TestMakeUnsigned(t *testing.T) {
// Good unchecked.
client.netFee = 42
client.bCount = 100500
client.bCount.Store(100500)
client.err = nil
tx, err := a.MakeUnsignedUncheckedRun(script, 1, nil)
require.NoError(t, err)

View file

@ -0,0 +1,301 @@
package actor
import (
"context"
"errors"
"fmt"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// PollingWaiterRetryCount is a threshold for a number of subsequent failed
// attempts to get block count from the RPC server for PollingWaiter. If it fails
// to retrieve block count PollingWaiterRetryCount times in a raw then transaction
// awaiting attempt considered to be failed and an error is returned.
const PollingWaiterRetryCount = 3
var (
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
// even after ValidUntilBlock block persist.
ErrTxNotAccepted = errors.New("transaction was not accepted to chain")
// ErrContextDone is returned when Waiter context has been done in the middle
// of transaction awaiting process and no result was received yet.
ErrContextDone = errors.New("waiter context done")
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
// doesn't support transaction awaiting.
ErrAwaitingNotSupported = errors.New("awaiting not supported")
)
type (
// Waiter is an interface providing transaction awaiting functionality to Actor.
Waiter interface {
// Wait allows to wait until transaction will be accepted to the chain. It can be
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
// ValidUntilBlock value and an error. It returns transaction execution result
// or an error if transaction wasn't accepted to the chain.
Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
// WaitAny waits until at least one of the specified transactions will be accepted
// to the chain until vub (including). It returns execution result of this
// transaction or an error if none of the transactions was accepted to the chain.
// It uses underlying RPCPollingWaiter or RPCEventWaiter context to interrupt
// awaiting process, but additional ctx can be passed as an argument for the same
// purpose.
WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error)
}
// RPCPollingWaiter is an interface that enables transaction awaiting functionality
// for Actor instance based on periodical BlockCount and ApplicationLog polls.
RPCPollingWaiter interface {
// Context should return the RPC client context to be able to gracefully
// shut down all running processes (if so).
Context() context.Context
GetVersion() (*result.Version, error)
GetBlockCount() (uint32, error)
GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error)
}
// RPCEventWaiter is an interface that enables improved transaction awaiting functionality
// for Actor instance based on web-socket Block and ApplicationLog notifications. RPCEventWaiter
// contains RPCPollingWaiter under the hood and falls back to polling when subscription-based
// awaiting fails.
RPCEventWaiter interface {
RPCPollingWaiter
SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error)
Unsubscribe(id string) error
}
)
// NullWaiter is a Waiter stub that doesn't support transaction awaiting functionality.
type NullWaiter struct{}
// PollingWaiter is a polling-based Waiter.
type PollingWaiter struct {
polling RPCPollingWaiter
version *result.Version
}
// EventWaiter is a websocket-based Waiter.
type EventWaiter struct {
ws RPCEventWaiter
polling Waiter
}
// newWaiter creates Waiter instance. It can be either websocket-based or
// polling-base, otherwise Waiter stub is returned.
func newWaiter(ra RPCActor, v *result.Version) Waiter {
if eventW, ok := ra.(RPCEventWaiter); ok {
return &EventWaiter{
ws: eventW,
polling: &PollingWaiter{
polling: eventW,
version: v,
},
}
}
if pollW, ok := ra.(RPCPollingWaiter); ok {
return &PollingWaiter{
polling: pollW,
version: v,
}
}
return NewNullWaiter()
}
// NewNullWaiter creates an instance of Waiter stub.
func NewNullWaiter() NullWaiter {
return NullWaiter{}
}
// Wait implements Waiter interface.
func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
return nil, ErrAwaitingNotSupported
}
// WaitAny implements Waiter interface.
func (NullWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
return nil, ErrAwaitingNotSupported
}
// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting.
func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) {
v, err := waiter.GetVersion()
if err != nil {
return nil, err
}
return &PollingWaiter{
polling: waiter,
version: v,
}, nil
}
// Wait implements Waiter interface.
func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
if err != nil {
return nil, err
}
return w.WaitAny(context.TODO(), vub, h)
}
// WaitAny implements Waiter interface.
func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
var (
currentHeight uint32
failedAttempt int
pollTime = time.Millisecond * time.Duration(w.version.Protocol.MillisecondsPerBlock) / 2
)
if pollTime == 0 {
pollTime = time.Second
}
timer := time.NewTicker(pollTime)
defer timer.Stop()
for {
select {
case <-timer.C:
blockCount, err := w.polling.GetBlockCount()
if err != nil {
failedAttempt++
if failedAttempt > PollingWaiterRetryCount {
return nil, fmt.Errorf("failed to retrieve block count: %w", err)
}
continue
}
failedAttempt = 0
if blockCount-1 > currentHeight {
currentHeight = blockCount - 1
}
t := trigger.Application
for _, h := range hashes {
res, err := w.polling.GetApplicationLog(h, &t)
if err == nil {
return &state.AppExecResult{
Container: res.Container,
Execution: res.Executions[0],
}, nil
}
}
if currentHeight >= vub {
return nil, ErrTxNotAccepted
}
case <-w.polling.Context().Done():
return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err())
case <-ctx.Done():
return nil, fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
}
}
// NewEventWaiter creates an instance of Waiter supporting websocket event-based transaction awaiting.
// EventWaiter contains PollingWaiter under the hood and falls back to polling when subscription-based
// awaiting fails.
func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) {
polling, err := NewPollingWaiter(waiter)
if err != nil {
return nil, err
}
return &EventWaiter{
ws: waiter,
polling: polling,
}, nil
}
// Wait implements Waiter interface.
func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) {
if err != nil {
return nil, err
}
return w.WaitAny(context.TODO(), vub, h)
}
// WaitAny implements Waiter interface.
func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) {
var wsWaitErr error
defer func() {
if wsWaitErr != nil {
res, waitErr = w.polling.WaitAny(ctx, vub, hashes...)
if waitErr != nil {
waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr)
}
}
}()
rcvr := make(chan rpcclient.Notification)
defer func() {
drainLoop:
// Drain rcvr to avoid other notification receivers blocking.
for {
select {
case <-rcvr:
default:
break drainLoop
}
}
close(rcvr)
}()
// Execution event follows the block event, thus wait until the block next to the VUB to be sure.
since := vub + 1
blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return
}
defer func() {
err = w.ws.Unsubscribe(blocksID)
if err != nil {
errFmt := "failed to unsubscribe from blocks (id: %s): %v"
errArgs := []interface{}{blocksID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
}()
for _, h := range hashes {
txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
}
defer func() {
err = w.ws.Unsubscribe(txsID)
if err != nil {
errFmt := "failed to unsubscribe from transactions (id: %s): %v"
errArgs := []interface{}{txsID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
}()
}
for {
select {
case ntf := <-rcvr:
switch ntf.Type {
case neorpc.BlockEventID:
waitErr = ErrTxNotAccepted
return
case neorpc.ExecutionEventID:
res = ntf.Value.(*state.AppExecResult)
return
case neorpc.MissedEventID:
// We're toast, retry with non-ws client.
wsWaitErr = errors.New("some event was missed")
return
}
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
return
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
return
}
}
}

View file

@ -0,0 +1,186 @@
package actor
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
type AwaitableRPCClient struct {
RPCClient
chLock sync.RWMutex
subBlockCh chan<- rpcclient.Notification
subTxCh chan<- rpcclient.Notification
}
func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subBlockCh = rcvrCh
return "1", nil
}
func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subTxCh = rcvrCh
return "2", nil
}
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
func TestNewWaiter(t *testing.T) {
w := newWaiter((RPCActor)(nil), nil)
_, ok := w.(NullWaiter)
require.True(t, ok)
w = newWaiter(&RPCClient{}, &result.Version{})
_, ok = w.(*PollingWaiter)
require.True(t, ok)
w = newWaiter(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{})
_, ok = w.(*EventWaiter)
require.True(t, ok)
}
func TestPollingWaiter_Wait(t *testing.T) {
h := util.Uint256{1, 2, 3}
bCount := uint32(5)
appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}}
expected := &state.AppExecResult{Container: h, Execution: state.Execution{}}
c := &RPCClient{appLog: appLog}
c.bCount.Store(bCount)
w := newWaiter(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
_, ok := w.(*PollingWaiter)
require.True(t, ok)
// Wait with error.
someErr := errors.New("some error")
_, err := w.Wait(h, bCount, someErr)
require.ErrorIs(t, err, someErr)
// AER is in chain immediately.
aer, err := w.Wait(h, bCount-1, nil)
require.NoError(t, err)
require.Equal(t, expected, aer)
// Missing AER after VUB.
c.appLog = nil
_, err = w.Wait(h, bCount-2, nil)
require.ErrorIs(t, ErrTxNotAccepted, err)
checkErr := func(t *testing.T, trigger func(), target error) {
errCh := make(chan error)
go func() {
_, err = w.Wait(h, bCount, nil)
errCh <- err
}()
timer := time.NewTimer(time.Second)
var triggerFired bool
waitloop:
for {
select {
case err = <-errCh:
require.ErrorIs(t, err, target)
break waitloop
case <-timer.C:
if triggerFired {
t.Fatal("failed to await result")
}
trigger()
triggerFired = true
timer.Reset(time.Second * 2)
}
}
require.True(t, triggerFired)
}
// Tx is accepted before VUB.
c.appLog = nil
c.bCount.Store(bCount)
checkErr(t, func() { c.bCount.Store(bCount + 1) }, ErrTxNotAccepted)
// Context is cancelled.
c.appLog = nil
c.bCount.Store(bCount)
ctx, cancel := context.WithCancel(context.Background())
c.context = ctx
checkErr(t, cancel, ErrContextDone)
}
func TestWSWaiter_Wait(t *testing.T) {
h := util.Uint256{1, 2, 3}
bCount := uint32(5)
appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}}
expected := &state.AppExecResult{Container: h, Execution: state.Execution{}}
c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}}
c.bCount.Store(bCount)
w := newWaiter(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
_, ok := w.(*EventWaiter)
require.True(t, ok)
// Wait with error.
someErr := errors.New("some error")
_, err := w.Wait(h, bCount, someErr)
require.ErrorIs(t, err, someErr)
// AER is in chain immediately.
doneCh := make(chan struct{})
go func() {
aer, err := w.Wait(h, bCount-1, nil)
require.NoError(t, err)
require.Equal(t, expected, aer)
doneCh <- struct{}{}
}()
check := func(t *testing.T, trigger func()) {
timer := time.NewTimer(time.Second)
var triggerFired bool
waitloop:
for {
select {
case <-doneCh:
break waitloop
case <-timer.C:
if triggerFired {
t.Fatal("failed to await result")
}
trigger()
triggerFired = true
timer.Reset(time.Second * 2)
}
}
require.True(t, triggerFired)
}
check(t, func() {
c.chLock.RLock()
defer c.chLock.RUnlock()
c.subBlockCh <- rpcclient.Notification{
Type: neorpc.ExecutionEventID,
Value: expected,
}
})
// Missing AER after VUB.
go func() {
_, err = w.Wait(h, bCount-2, nil)
require.ErrorIs(t, err, ErrTxNotAccepted)
doneCh <- struct{}{}
}()
check(t, func() {
c.chLock.RLock()
defer c.chLock.RUnlock()
c.subBlockCh <- rpcclient.Notification{
Type: neorpc.BlockEventID,
Value: &block.Block{},
}
})
}

View file

@ -33,8 +33,11 @@ type Client struct {
cli *http.Client
endpoint *url.URL
ctx context.Context
opts Options
requestF func(*neorpc.Request) (*neorpc.Response, error)
// ctxCancel is a cancel function aimed to send closing signal to the users of
// ctx.
ctxCancel func()
opts Options
requestF func(*neorpc.Request) (*neorpc.Response, error)
// reader is an Invoker that has no signers and uses current state,
// it's used to implement various getters. It'll be removed eventually,
@ -125,7 +128,9 @@ func initClient(ctx context.Context, cl *Client, endpoint string, opts Options)
// if opts.Cert != "" && opts.Key != "" {
// }
cl.ctx = ctx
cancelCtx, cancel := context.WithCancel(ctx)
cl.ctx = cancelCtx
cl.ctxCancel = cancel
cl.cli = httpClient
cl.endpoint = url
cl.cache = cache{
@ -176,6 +181,7 @@ func (c *Client) Init() error {
// Close closes unused underlying networks connections.
func (c *Client) Close() {
c.ctxCancel()
c.cli.CloseIdleConnections()
}
@ -248,3 +254,8 @@ func (c *Client) Ping() error {
_ = conn.Close()
return nil
}
// Context returns client instance context.
func (c *Client) Context() context.Context {
return c.ctx
}

View file

@ -0,0 +1,16 @@
package invoker_test
import (
"testing"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
)
func TestRPCInvokerRPCClientCompat(t *testing.T) {
_ = invoker.RPCInvoke(&rpcclient.Client{})
_ = invoker.RPCInvoke(&rpcclient.WSClient{})
_ = invoker.RPCInvokeHistoric(&rpcclient.Client{})
_ = invoker.RPCInvokeHistoric(&rpcclient.WSClient{})
_ = invoker.RPCSessions(&rpcclient.WSClient{})
}

View file

@ -1,9 +1,11 @@
package notary
import (
"context"
"errors"
"fmt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
@ -313,3 +315,15 @@ func (a *Actor) SendRequestExactly(mainTx *transaction.Transaction, fbTx *transa
}
return mainHash, fbHash, vub, nil
}
// Wait waits until main or fallback transaction will be accepted to the chain and returns
// the resulting application execution result or actor.ErrTxNotAccepted if both transactions
// failed to persist. Wait can be used if underlying Actor supports transaction awaiting,
// see actor.Actor and actor.Waiter documentation for details. Wait may be used as a wrapper
// for Notarize, SendRequest or SendRequestExactly.
func (a *Actor) Wait(mainHash, fbHash util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
if err != nil {
return nil, err
}
return a.WaitAny(context.TODO(), vub, mainHash, fbHash)
}

View file

@ -1,11 +1,13 @@
package notary
import (
"context"
"errors"
"testing"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
@ -14,9 +16,11 @@ import (
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/stretchr/testify/require"
)
@ -30,6 +34,7 @@ type RPCClient struct {
hash util.Uint256
nhash util.Uint256
mirror bool
applog *result.ApplicationLog
}
func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) {
@ -66,6 +71,14 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) {
func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) {
return nil, nil // Just a stub, unused by actor.
}
func (r *RPCClient) Context() context.Context {
return context.Background()
}
func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
return r.applog, nil
}
var _ = actor.RPCPollingWaiter(&RPCClient{})
func TestNewActor(t *testing.T) {
rc := &RPCClient{
@ -520,3 +533,54 @@ func TestDefaultActorOptions(t *testing.T) {
require.NoError(t, opts.MainCheckerModifier(&result.Invoke{State: "HALT"}, tx))
require.Equal(t, uint32(42), tx.ValidUntilBlock)
}
func TestWait(t *testing.T) {
rc := &RPCClient{version: &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}}
key0, err := keys.NewPrivateKey()
require.NoError(t, err)
key1, err := keys.NewPrivateKey()
require.NoError(t, err)
acc0 := wallet.NewAccountFromPrivateKey(key0)
facc1 := FakeSimpleAccount(key1.PublicKey())
act, err := NewActor(rc, []actor.SignerAccount{{
Signer: transaction.Signer{
Account: acc0.Contract.ScriptHash(),
Scopes: transaction.None,
},
Account: acc0,
}, {
Signer: transaction.Signer{
Account: facc1.Contract.ScriptHash(),
Scopes: transaction.CalledByEntry,
},
Account: facc1,
}}, acc0)
require.NoError(t, err)
someErr := errors.New("someErr")
_, err = act.Wait(util.Uint256{}, util.Uint256{}, 0, someErr)
require.ErrorIs(t, err, someErr)
cont := util.Uint256{1, 2, 3}
ex := state.Execution{
Trigger: trigger.Application,
VMState: vmstate.Halt,
GasConsumed: 123,
Stack: []stackitem.Item{stackitem.Null{}},
}
applog := &result.ApplicationLog{
Container: cont,
IsTransaction: true,
Executions: []state.Execution{ex},
}
rc.applog = applog
res, err := act.Wait(util.Uint256{}, util.Uint256{}, 0, nil)
require.NoError(t, err)
require.Equal(t, &state.AppExecResult{
Container: cont,
Execution: ex,
}, res)
}

View file

@ -15,6 +15,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/neorpc/rpcevent"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic"
)
@ -45,20 +46,49 @@ type WSClient struct {
closeErr error
subscriptionsLock sync.RWMutex
subscriptions map[string]bool
subscriptions map[string]notificationReceiver
respLock sync.RWMutex
respChannels map[uint64]chan *neorpc.Response
}
// notificationReceiver is a server events receiver. It stores desired notifications ID
// and filter and a channel used to receive matching notifications.
type notificationReceiver struct {
typ neorpc.EventID
filter interface{}
ch chan<- Notification
}
// EventID implements neorpc.Comparator interface and returns notification ID.
func (r notificationReceiver) EventID() neorpc.EventID {
return r.typ
}
// Filter implements neorpc.Comparator interface and returns notification filter.
func (r notificationReceiver) Filter() interface{} {
return r.filter
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of block.Block, state.AppExecResult, state.ContainedNotificationEvent
// transaction.Transaction or subscriptions.NotaryRequestEvent based on Type.
// Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent
// *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type.
type Notification struct {
Type neorpc.EventID
Value interface{}
}
// EventID implements Container interface and returns notification ID.
func (n Notification) EventID() neorpc.EventID {
return n.Type
}
// EventPayload implements Container interface and returns notification
// object.
func (n Notification) EventPayload() interface{} {
return n.Value
}
// requestResponse is a combined type for request and response since we can get
// any of them here.
type requestResponse struct {
@ -107,7 +137,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
closeCalled: *atomic.NewBool(false),
respChannels: make(map[uint64]chan *neorpc.Response),
requests: make(chan *neorpc.Request),
subscriptions: make(map[string]bool),
subscriptions: make(map[string]notificationReceiver),
}
err = initClient(ctx, &wsc.Client, endpoint, opts)
@ -132,6 +162,8 @@ func (c *WSClient) Close() {
// which in turn makes wsReader receive an err from ws.ReadJSON() and also
// break out of the loop closing c.done channel in its shutdown sequence.
close(c.shutdown)
// Call to cancel will send signal to all users of Context().
c.Client.ctxCancel()
}
<-c.done
}
@ -205,7 +237,16 @@ readloop:
break readloop
}
}
c.Notifications <- Notification{event, val}
ok := make(map[chan<- Notification]bool)
c.subscriptionsLock.RLock()
for _, rcvr := range c.subscriptions {
ntf := Notification{Type: event, Value: val}
if (rpcevent.Matches(rcvr, ntf) || event == neorpc.MissedEventID /*missed event must be delivered to each receiver*/) && !ok[rcvr.ch] {
ok[rcvr.ch] = true // strictly one notification per channel
rcvr.ch <- ntf // this will block other receivers
}
}
c.subscriptionsLock.RUnlock()
} else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) {
id, err := strconv.ParseUint(string(rr.ID), 10, 64)
if err != nil {
@ -235,6 +276,7 @@ readloop:
c.respChannels = nil
c.respLock.Unlock()
close(c.Notifications)
c.Client.ctxCancel()
}
func (c *WSClient) wsWriter() {
@ -317,7 +359,7 @@ func (c *WSClient) makeWsRequest(r *neorpc.Request) (*neorpc.Response, error) {
}
}
func (c *WSClient) performSubscription(params []interface{}) (string, error) {
func (c *WSClient) performSubscription(params []interface{}, rcvr notificationReceiver) (string, error) {
var resp string
if err := c.performRequest("subscribe", params, &resp); err != nil {
@ -327,7 +369,7 @@ func (c *WSClient) performSubscription(params []interface{}) (string, error) {
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
c.subscriptions[resp] = true
c.subscriptions[resp] = rcvr
return resp, nil
}
@ -337,7 +379,7 @@ func (c *WSClient) performUnsubscription(id string) error {
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
if !c.subscriptions[id] {
if _, ok := c.subscriptions[id]; !ok {
return errors.New("no subscription with this ID")
}
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
@ -351,64 +393,178 @@ func (c *WSClient) performUnsubscription(id string) error {
}
// SubscribeForNewBlocks adds subscription for new block events to this instance
// of the client. It can be filtered by primary consensus node index, nil value doesn't
// add any filters.
func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) {
params := []interface{}{"block_added"}
if primary != nil {
params = append(params, neorpc.BlockFilter{Primary: *primary})
// of the client. It can be filtered by primary consensus node index and/or block
// index allowing to receive blocks since the specified index only, nil value is
// treated as missing filter.
//
// Deprecated: please, use SubscribeForNewBlocksWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) {
return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications)
}
// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the
// new block events. Events can be filtered by primary consensus node index, nil
// value doesn't add any filters. If the receiver channel is nil, then the default
// Notifications channel will be used. The receiver channel must be properly read
// and drained after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
return c.performSubscription(params)
params := []interface{}{"block_added"}
var flt *neorpc.BlockFilter
if primary != nil || sinceIndex != nil || tillIndex != nil {
flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex}
params = append(params, flt)
}
rcvr := notificationReceiver{
typ: neorpc.BlockEventID,
filter: flt,
ch: rcvrCh,
}
return c.performSubscription(params, rcvr)
}
// SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of the client. It can be filtered by the sender and/or the signer, nil
// value is treated as missing filter.
//
// Deprecated: please, use SubscribeForNewTransactionsWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) {
params := []interface{}{"transaction_added"}
if sender != nil || signer != nil {
params = append(params, neorpc.TxFilter{Sender: sender, Signer: signer})
return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications)
}
// SubscribeForNewTransactionsWithChan registers provided channel as a receiver
// for new transaction events. Events can be filtered by the sender and/or the
// signer, nil value is treated as missing filter. If the receiver channel is nil,
// then the default Notifications channel will be used. The receiver channel must be
// properly read and drained after usage in order not to block other notification
// receivers.
func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
return c.performSubscription(params)
params := []interface{}{"transaction_added"}
var flt *neorpc.TxFilter
if sender != nil || signer != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: signer}
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.TransactionEventID,
filter: flt,
ch: rcvrCh,
}
return c.performSubscription(params, rcvr)
}
// SubscribeForExecutionNotifications adds subscription for notifications
// generated during transaction execution to this instance of the client. It can be
// filtered by the contract's hash (that emits notifications), nil value puts no such
// restrictions.
//
// Deprecated: please, use SubscribeForExecutionNotificationsWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) {
params := []interface{}{"notification_from_execution"}
if contract != nil || name != nil {
params = append(params, neorpc.NotificationFilter{Contract: contract, Name: name})
return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications)
}
// SubscribeForExecutionNotificationsWithChan registers provided channel as a
// receiver for execution events. Events can be filtered by the contract's hash
// (that emits notifications), nil value puts no such restrictions. If the
// receiver channel is nil, then the default Notifications channel will be used.
// The receiver channel must be properly read and drained after usage in order
// not to block other notification receivers.
func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
return c.performSubscription(params)
params := []interface{}{"notification_from_execution"}
var flt *neorpc.NotificationFilter
if contract != nil || name != nil {
flt = &neorpc.NotificationFilter{Contract: contract, Name: name}
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.NotificationEventID,
filter: flt,
ch: rcvrCh,
}
return c.performSubscription(params, rcvr)
}
// SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of the client. It can
// be filtered by state (HALT/FAULT) to check for successful or failing
// transactions, nil value means no filtering.
func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) {
params := []interface{}{"transaction_executed"}
if state != nil {
if *state != "HALT" && *state != "FAULT" {
return "", errors.New("bad state parameter")
}
params = append(params, neorpc.ExecutionFilter{State: *state})
// transactions; it can also be filtered by script container hash.
// nil value means no filtering.
//
// Deprecated: please, use SubscribeForTransactionExecutionsWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) {
return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications)
}
// SubscribeForTransactionExecutionsWithChan registers provided channel as a
// receiver for application execution result events generated during transaction
// execution. Events can be filtered by state (HALT/FAULT) to check for successful
// or failing transactions; it can also be filtered by script container hash.
// nil value means no filtering. If the receiver channel is nil, then the default
// Notifications channel will be used. The receiver channel must be properly read
// and drained after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
return c.performSubscription(params)
params := []interface{}{"transaction_executed"}
var flt *neorpc.ExecutionFilter
if state != nil || container != nil {
if state != nil {
if *state != "HALT" && *state != "FAULT" {
return "", errors.New("bad state parameter")
}
}
flt = &neorpc.ExecutionFilter{State: state, Container: container}
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.ExecutionEventID,
filter: flt,
ch: rcvrCh,
}
return c.performSubscription(params, rcvr)
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
// addition or removal events to this instance of client. It can be filtered by
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions.
//
// Deprecated: please, use SubscribeForNotaryRequestsWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) {
params := []interface{}{"notary_request_event"}
if sender != nil {
params = append(params, neorpc.TxFilter{Sender: sender, Signer: mainSigner})
return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications)
}
// SubscribeForNotaryRequestsWithChan registers provided channel as a receiver
// for notary request payload addition or removal events. It can be filtered by
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions. If the receiver channel is nil, then the default Notifications
// channel will be used. The receiver channel must be properly read and drained
// after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
return c.performSubscription(params)
params := []interface{}{"notary_request_event"}
var flt *neorpc.TxFilter
if sender != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner}
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.NotaryRequestEventID,
filter: flt,
ch: rcvrCh,
}
return c.performSubscription(params, rcvr)
}
// Unsubscribe removes subscription for the given event stream.
@ -455,3 +611,8 @@ func (c *WSClient) GetError() error {
}
return c.closeErr
}
// Context returns WSClient Cancel context that will be terminated on Client shutdown.
func (c *WSClient) Context() context.Context {
return c.Client.ctx
}

View file

@ -32,18 +32,31 @@ func TestWSClientClose(t *testing.T) {
}
func TestWSClientSubscription(t *testing.T) {
ch := make(chan Notification)
var cases = map[string]func(*WSClient) (string, error){
"blocks": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocks(nil)
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, nil)
},
"blocks_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch)
},
"transactions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactions(nil, nil)
return wsc.SubscribeForNewTransactionsWithChan(nil, nil, nil)
},
"transactions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch)
},
"notifications": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotifications(nil, nil)
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, nil)
},
"notifications_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch)
},
"executions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutions(nil)
return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, nil)
},
"executions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch)
},
}
t.Run("good", func(t *testing.T) {
@ -83,13 +96,13 @@ func TestWSClientUnsubscription(t *testing.T) {
var cases = map[string]responseCheck{
"good": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = true
wsc.subscriptions["0"] = notificationReceiver{}
err := wsc.Unsubscribe("0")
require.NoError(t, err)
}},
"all": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = true
wsc.subscriptions["0"] = notificationReceiver{}
err := wsc.UnsubscribeAll()
require.NoError(t, err)
require.Equal(t, 0, len(wsc.subscriptions))
@ -100,13 +113,13 @@ func TestWSClientUnsubscription(t *testing.T) {
}},
"error returned": {`{"jsonrpc": "2.0", "id": 1, "error":{"code":-32602,"message":"Invalid Params"}}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = true
wsc.subscriptions["0"] = notificationReceiver{}
err := wsc.Unsubscribe("0")
require.Error(t, err)
}},
"false returned": {`{"jsonrpc": "2.0", "id": 1, "result": false}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = true
wsc.subscriptions["0"] = notificationReceiver{}
err := wsc.Unsubscribe("0")
require.Error(t, err)
}},
@ -151,26 +164,108 @@ func TestWSClientEvents(t *testing.T) {
}
}))
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
for range events {
t.Run("default ntf channel", func(t *testing.T) {
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications}
// MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock()
for range events {
select {
case _, ok = <-wsc.Notifications:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
select {
case _, ok = <-wsc.Notifications:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
select {
case _, ok = <-wsc.Notifications:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// Connection closed by server.
require.False(t, ok)
// Connection closed by server.
require.False(t, ok)
})
t.Run("multiple ntf channels", func(t *testing.T) {
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
ch1 := make(chan Notification)
ch2 := make(chan Notification)
ch3 := make(chan Notification)
halt := "HALT"
fault := "FAULT"
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications}
wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1}
wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2}
wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions
wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2}
wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3}
// MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock()
var (
defaultChCnt int
ch1Cnt int
ch2Cnt int
ch3Cnt int
expectedDefaultCnCount = len(events)
expectedCh1Cnt = 1 + 1 // Block event + Missed event
expectedCh2Cnt = 1 + 2 + 1 // Notification event + 2 Execution events + Missed event
expectedCh3Cnt = 1 // Missed event
ntf Notification
)
for i := 0; i < expectedDefaultCnCount+expectedCh1Cnt+expectedCh2Cnt+expectedCh3Cnt; i++ {
select {
case ntf, ok = <-wsc.Notifications:
defaultChCnt++
case ntf, ok = <-ch1:
require.True(t, ntf.Type == neorpc.BlockEventID || ntf.Type == neorpc.MissedEventID, ntf.Type)
ch1Cnt++
case ntf, ok = <-ch2:
require.True(t, ntf.Type == neorpc.NotificationEventID || ntf.Type == neorpc.MissedEventID || ntf.Type == neorpc.ExecutionEventID)
ch2Cnt++
case ntf, ok = <-ch3:
require.True(t, ntf.Type == neorpc.MissedEventID)
ch3Cnt++
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
select {
case _, ok = <-wsc.Notifications:
case _, ok = <-ch1:
case _, ok = <-ch2:
case _, ok = <-ch3:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// Connection closed by server.
require.False(t, ok)
require.Equal(t, expectedDefaultCnCount, defaultChCnt)
require.Equal(t, expectedCh1Cnt, ch1Cnt)
require.Equal(t, expectedCh2Cnt, ch2Cnt)
require.Equal(t, expectedCh3Cnt, ch3Cnt)
})
}
func TestWSExecutionVMStateCheck(t *testing.T) {
@ -181,7 +276,7 @@ func TestWSExecutionVMStateCheck(t *testing.T) {
wsc.getNextRequestID = getTestRequestID
require.NoError(t, wsc.Init())
filter := "NONE"
_, err = wsc.SubscribeForTransactionExecutions(&filter)
_, err = wsc.SubscribeForTransactionExecutionsWithChan(&filter, nil, nil)
require.Error(t, err)
wsc.Close()
}
@ -192,23 +287,74 @@ func TestWSFilteredSubscriptions(t *testing.T) {
clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *params.Params)
}{
{"blocks",
{"blocks primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
_, err := wsc.SubscribeForNewBlocks(&primary)
_, err := wsc.SubscribeForNewBlocksWithChan(&primary, nil, nil, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 3, filt.Primary)
require.Equal(t, 3, *filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"blocks since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.SubscribeForNewBlocksWithChan(nil, &since, nil, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"blocks till",
func(t *testing.T, wsc *WSClient) {
var till uint32 = 3
_, err := wsc.SubscribeForNewBlocksWithChan(nil, nil, &till, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (uint32)(3), *filt.Till)
},
},
{"blocks primary, since and till",
func(t *testing.T, wsc *WSClient) {
var (
since uint32 = 3
primary = 2
till uint32 = 5
)
_, err := wsc.SubscribeForNewBlocksWithChan(&primary, &since, &till, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 2, *filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, uint32(5), *filt.Till)
},
},
{"transactions sender",
func(t *testing.T, wsc *WSClient) {
sender := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForNewTransactions(&sender, nil)
_, err := wsc.SubscribeForNewTransactionsWithChan(&sender, nil, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -222,7 +368,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"transactions signer",
func(t *testing.T, wsc *WSClient) {
signer := util.Uint160{0, 42}
_, err := wsc.SubscribeForNewTransactions(nil, &signer)
_, err := wsc.SubscribeForNewTransactionsWithChan(nil, &signer, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -237,7 +383,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
sender := util.Uint160{1, 2, 3, 4, 5}
signer := util.Uint160{0, 42}
_, err := wsc.SubscribeForNewTransactions(&sender, &signer)
_, err := wsc.SubscribeForNewTransactionsWithChan(&sender, &signer, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -251,7 +397,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"notifications contract hash",
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForExecutionNotifications(&contract, nil)
_, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, nil, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -265,7 +411,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"notifications name",
func(t *testing.T, wsc *WSClient) {
name := "my_pretty_notification"
_, err := wsc.SubscribeForExecutionNotifications(nil, &name)
_, err := wsc.SubscribeForExecutionNotificationsWithChan(nil, &name, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -280,7 +426,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
name := "my_pretty_notification"
_, err := wsc.SubscribeForExecutionNotifications(&contract, &name)
_, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, &name, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -291,17 +437,47 @@ func TestWSFilteredSubscriptions(t *testing.T) {
require.Equal(t, "my_pretty_notification", *filt.Name)
},
},
{"executions",
{"executions state",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
_, err := wsc.SubscribeForTransactionExecutions(&state)
_, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, nil, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.ExecutionFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, "FAULT", filt.State)
require.Equal(t, "FAULT", *filt.State)
require.Equal(t, (*util.Uint256)(nil), filt.Container)
},
},
{"executions container",
func(t *testing.T, wsc *WSClient) {
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutionsWithChan(nil, &container, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.ExecutionFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*string)(nil), filt.State)
require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container)
},
},
{"executions state and container",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, &container, nil)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.ExecutionFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, "FAULT", *filt.State)
require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container)
},
},
}

View file

@ -1947,3 +1947,57 @@ func TestClient_Iterator_SessionConfigVariations(t *testing.T) {
}
})
}
func TestClient_Wait(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close()
defer rpcSrv.Shutdown()
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{})
require.NoError(t, err)
acc, err := wallet.NewAccount()
require.NoError(t, err)
act, err := actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
b, err := chain.GetBlock(chain.GetHeaderHash(1))
require.NoError(t, err)
require.True(t, len(b.Transactions) > 0)
check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) {
rcvr := make(chan struct{})
go func() {
aer, err := act.Wait(h, vub, nil)
if errExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, h, aer.Container)
}
rcvr <- struct{}{}
}()
waitloop:
for {
select {
case <-rcvr:
break waitloop
case <-time.NewTimer(time.Duration(chain.GetConfig().SecondsPerBlock) * time.Second).C:
t.Fatal("transaction failed to be awaited")
}
}
}
// Wait for transaction that has been persisted and VUB block has been persisted.
check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false)
// Wait for transaction that has been persisted and VUB block hasn't yet been persisted.
check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false)
// Wait for transaction that hasn't been persisted and VUB block has been persisted.
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
}

View file

@ -40,6 +40,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/neorpc/rpcevent"
"github.com/nspcc-dev/neo-go/pkg/network"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster"
@ -2425,7 +2426,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
case neorpc.ExecutionEventID:
flt := new(neorpc.ExecutionFilter)
err = jd.Decode(flt)
if err == nil && (flt.State == "HALT" || flt.State == "FAULT") {
if err == nil && (flt.State != nil && (*flt.State == "HALT" || *flt.State == "FAULT")) {
filter = *flt
} else if err == nil {
err = errors.New("invalid state")
@ -2593,7 +2594,7 @@ chloop:
continue
}
for i := range sub.feeds {
if sub.feeds[i].Matches(&resp) {
if rpcevent.Matches(sub.feeds[i], &resp) {
if msg == nil {
b, err = json.Marshal(resp)
if err != nil {

View file

@ -2,11 +2,7 @@ package rpcsrv
import (
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"go.uber.org/atomic"
)
@ -22,12 +18,23 @@ type (
// that's not for long.
feeds [maxFeeds]feed
}
// feed stores subscriber's desired event ID with filter.
feed struct {
event neorpc.EventID
filter interface{}
}
)
// EventID implements neorpc.EventComparator interface and returns notification ID.
func (f feed) EventID() neorpc.EventID {
return f.event
}
// Filter implements neorpc.EventComparator interface and returns notification filter.
func (f feed) Filter() interface{} {
return f.filter
}
const (
// Maximum number of subscriptions per one client.
maxFeeds = 16
@ -42,59 +49,3 @@ const (
// a lot in terms of memory used.
notificationBufSize = 1024
)
func (f *feed) Matches(r *neorpc.Notification) bool {
if r.Event != f.event {
return false
}
if f.filter == nil {
return true
}
switch f.event {
case neorpc.BlockEventID:
filt := f.filter.(neorpc.BlockFilter)
b := r.Payload[0].(*block.Block)
return int(b.PrimaryIndex) == filt.Primary
case neorpc.TransactionEventID:
filt := f.filter.(neorpc.TxFilter)
tx := r.Payload[0].(*transaction.Transaction)
senderOK := filt.Sender == nil || tx.Sender().Equals(*filt.Sender)
signerOK := true
if filt.Signer != nil {
signerOK = false
for i := range tx.Signers {
if tx.Signers[i].Account.Equals(*filt.Signer) {
signerOK = true
break
}
}
}
return senderOK && signerOK
case neorpc.NotificationEventID:
filt := f.filter.(neorpc.NotificationFilter)
notification := r.Payload[0].(*state.ContainedNotificationEvent)
hashOk := filt.Contract == nil || notification.ScriptHash.Equals(*filt.Contract)
nameOk := filt.Name == nil || notification.Name == *filt.Name
return hashOk && nameOk
case neorpc.ExecutionEventID:
filt := f.filter.(neorpc.ExecutionFilter)
applog := r.Payload[0].(*state.AppExecResult)
return applog.VMState.String() == filt.State
case neorpc.NotaryRequestEventID:
filt := f.filter.(neorpc.TxFilter)
req := r.Payload[0].(*result.NotaryRequestEvent)
senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender
signerOK := true
if filt.Signer != nil {
signerOK = false
for _, signer := range req.NotaryRequest.MainTransaction.Signers {
if signer.Account.Equals(*filt.Signer) {
signerOK = true
break
}
}
}
return senderOk && signerOK
}
return false
}