Merge pull request #2804 from nspcc-dev/check-aer-sub

rpc: fix subscribers locking logic and properly drain poll-based waiter receiver
This commit is contained in:
Roman Khimov 2022-11-17 04:24:35 +07:00 committed by GitHub
commit ab0ff63ce1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 359 additions and 137 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/neotest"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nns"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/stretchr/testify/require"
@ -273,6 +274,11 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
storageCfg := filepath.Join(testDataPrefix, "storage", "storage_contract.yml")
_, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, StorageContractID)
// Block #23: add FAULTed transaction to check WSClient waitloops.
faultedInvoker := e.NewInvoker(cHash, acc0)
faultedH := faultedInvoker.InvokeScriptCheckFAULT(t, []byte{byte(opcode.ABORT)}, []neotest.Signer{acc0}, "at instruction 0 (ABORT): ABORT")
t.Logf("FAULTed transaction:\n\thash LE: %s\n\tblock index: %d", faultedH.StringLE(), e.Chain.BlockHeight())
// Compile contract to test `invokescript` RPC call
invokePath := filepath.Join(testDataPrefix, "invoke", "invokescript_contract.go")
invokeCfg := filepath.Join(testDataPrefix, "invoke", "invoke.yml")

View file

@ -307,7 +307,6 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
e.AddNewBlock(t)
e.AddNewBlock(t) // This block is stateSyncPoint-th block.
e.AddNewBlock(t)
e.AddNewBlock(t)
require.Equal(t, stateSyncPoint+2, int(bcSpout.BlockHeight()))
boltCfg := func(c *config.ProtocolConfiguration) {

View file

@ -200,9 +200,10 @@ func (e *Executor) InvokeScriptCheckHALT(t testing.TB, script []byte, signers []
// InvokeScriptCheckFAULT adds a transaction with the specified script to the
// chain and checks if it's FAULTed with the specified error.
func (e *Executor) InvokeScriptCheckFAULT(t testing.TB, script []byte, signers []Signer, errMessage string) {
func (e *Executor) InvokeScriptCheckFAULT(t testing.TB, script []byte, signers []Signer, errMessage string) util.Uint256 {
hash := e.InvokeScript(t, script, signers)
e.CheckFault(t, hash, errMessage)
return hash
}
// CheckHalt checks that the transaction is persisted with HALT state.

View file

@ -217,91 +217,118 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap
// WaitAny implements Waiter interface.
func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) {
var wsWaitErr error
defer func() {
if wsWaitErr != nil {
res, waitErr = w.polling.WaitAny(ctx, vub, hashes...)
if waitErr != nil {
waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr)
}
}
}()
bRcvr := make(chan *block.Block)
aerRcvr := make(chan *state.AppExecResult)
defer func() {
drainLoop:
// Drain receivers to avoid other notification receivers blocking.
for {
select {
case <-bRcvr:
case <-aerRcvr:
default:
break drainLoop
}
}
if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) {
close(bRcvr)
close(aerRcvr)
}
}()
// Execution event precedes the block event, thus wait until the VUB-th block to be sure.
var (
wsWaitErr error
waitersActive int
bRcvr = make(chan *block.Block, 2)
aerRcvr = make(chan *state.AppExecResult, len(hashes))
unsubErrs = make(chan error)
exit = make(chan struct{})
)
// Execution event preceded the block event, thus wait until the VUB-th block to be sure.
since := vub
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return
}
defer func() {
err = w.ws.Unsubscribe(blocksID)
if err != nil {
errFmt := "failed to unsubscribe from blocks (id: %s): %v"
errArgs := []interface{}{blocksID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
}()
for _, h := range hashes {
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
}
defer func() {
err = w.ws.Unsubscribe(txsID)
} else {
waitersActive++
go func() {
<-exit
err = w.ws.Unsubscribe(blocksID)
if err != nil {
errFmt := "failed to unsubscribe from transactions (id: %s): %v"
errArgs := []interface{}{txsID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err)
return
}
unsubErrs <- nil
}()
}
if wsWaitErr == nil {
for _, h := range hashes {
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
break
}
waitersActive++
go func() {
<-exit
err = w.ws.Unsubscribe(txsID)
if err != nil {
unsubErrs <- fmt.Errorf("failed to unsubscribe from transactions (id: %s): %w", txsID, err)
return
}
unsubErrs <- nil
}()
}
}
select {
case _, ok := <-bRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
if wsWaitErr == nil {
select {
case b, ok := <-bRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
break
}
// We can easily end up in a situation when subscription was performed too late and
// the desired transaction and VUB-th block have already got accepted before the
// subscription happened. Thus, always retry with non-ws client, it will perform
// AER requests and make sure.
wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index)
case aer, ok := <-aerRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
break
}
res = aer
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
waitErr = ErrTxNotAccepted
case aer, ok := <-aerRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
close(exit)
if waitersActive > 0 {
// Drain receivers to avoid other notification receivers blocking.
drainLoop:
for {
select {
case <-bRcvr:
case <-aerRcvr:
case unsubErr := <-unsubErrs:
if unsubErr != nil {
errFmt := "unsubscription error: %v"
errArgs := []interface{}{unsubErr}
if waitErr != nil {
errFmt = "%w; " + errFmt
errArgs = append([]interface{}{waitErr}, errArgs...)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
waitersActive--
// Wait until all receiver channels finish their work.
if waitersActive == 0 {
break drainLoop
}
}
}
}
if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) {
close(bRcvr)
close(aerRcvr)
}
close(unsubErrs)
// Rollback to a poll-based waiter if needed.
if wsWaitErr != nil && waitErr == nil {
res, waitErr = w.polling.WaitAny(ctx, vub, hashes...)
if waitErr != nil {
// Wrap the poll-based error, it's more important.
waitErr = fmt.Errorf("event-based error: %v; poll-based waiter error: %w", wsWaitErr, waitErr)
}
res = aer
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
return
}

View file

@ -166,6 +166,7 @@ func TestWSWaiter_Wait(t *testing.T) {
})
// Missing AER after VUB.
c.RPCClient.appLog = nil
go func() {
_, err = w.Wait(h, bCount-2, nil)
require.ErrorIs(t, err, ErrTxNotAccepted)

View file

@ -625,16 +625,6 @@ func (c *WSClient) performSubscription(params []interface{}, rcvr notificationRe
return resp, nil
}
func (c *WSClient) performUnsubscription(id string) error {
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
if _, ok := c.subscriptions[id]; !ok {
return errors.New("no subscription with this ID")
}
return c.removeSubscription(id)
}
// SubscribeForNewBlocks adds subscription for new block events to this instance
// of the client. It can be filtered by primary consensus node index, nil value doesn't
// add any filters.
@ -876,29 +866,55 @@ func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *resu
return c.performSubscription(params, r)
}
// Unsubscribe removes subscription for the given event stream.
// Unsubscribe removes subscription for the given event stream. It will return an
// error in case if there's no subscription with the provided ID. Call to Unsubscribe
// doesn't block notifications receive process for given subscriber, thus, ensure
// that subscriber channel is properly drained while unsubscription is being
// performed. You may probably need to run unsubscription process in a separate
// routine (in parallel with notification receiver routine) to avoid Client's
// notification dispatcher blocking.
func (c *WSClient) Unsubscribe(id string) error {
return c.performUnsubscription(id)
}
// UnsubscribeAll removes all active subscriptions of the current client.
// UnsubscribeAll removes all active subscriptions of the current client. It copies
// the list of subscribers in order not to hold the lock for the whole execution
// time and tries to unsubscribe from us many feeds as possible returning the
// chunk of unsubscription errors afterwards. Call to UnsubscribeAll doesn't block
// notifications receive process for given subscribers, thus, ensure that subscribers
// channels are properly drained while unsubscription is being performed. You may
// probably need to run unsubscription process in a separate routine (in parallel
// with notification receiver routines) to avoid Client's notification dispatcher
// blocking.
func (c *WSClient) UnsubscribeAll() error {
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
subs := make([]string, 0, len(c.subscriptions))
for id := range c.subscriptions {
err := c.removeSubscription(id)
subs = append(subs, id)
}
c.subscriptionsLock.Unlock()
var resErr error
for _, id := range subs {
err := c.performUnsubscription(id)
if err != nil {
return err
errFmt := "failed to unsubscribe from feed %d: %v"
errArgs := []interface{}{err}
if resErr != nil {
errFmt = "%w; " + errFmt
errArgs = append([]interface{}{resErr}, errArgs...)
}
resErr = fmt.Errorf(errFmt, errArgs...)
}
}
return nil
return resErr
}
// removeSubscription is internal method that removes subscription with the given
// ID from the list of subscriptions and receivers. It must be performed under
// subscriptions lock.
func (c *WSClient) removeSubscription(id string) error {
// performUnsubscription is internal method that removes subscription with the given
// ID from the list of subscriptions and receivers. It takes the subscriptions lock
// after WS RPC unsubscription request is completed. Until then the subscriber channel
// may still receive WS notifications.
func (c *WSClient) performUnsubscription(id string) error {
var resp bool
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
return err
@ -906,7 +922,14 @@ func (c *WSClient) removeSubscription(id string) error {
if !resp {
return errors.New("unsubscribe method returned false result")
}
rcvr := c.subscriptions[id]
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
rcvr, ok := c.subscriptions[id]
if !ok {
return errors.New("no subscription with this ID")
}
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {

View file

@ -19,8 +19,10 @@ import (
"github.com/nspcc-dev/neo-go/internal/testchain"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -1419,6 +1421,7 @@ func TestClient_NEP11_ND(t *testing.T) {
expected := stackitem.NewMap()
expected.Add(stackitem.Make([]byte("name")), stackitem.Make([]byte("neo.com")))
expected.Add(stackitem.Make([]byte("expiration")), stackitem.Make(blockRegisterDomain.Timestamp+365*24*3600*1000)) // expiration formula
expected.Add(stackitem.Make([]byte("admin")), stackitem.Null{})
require.EqualValues(t, expected, p)
})
t.Run("Transfer", func(t *testing.T) {
@ -2001,3 +2004,127 @@ func TestClient_Wait(t *testing.T) {
// Wait for transaction that hasn't been persisted and VUB block has been persisted.
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
}
func TestWSClient_Wait(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
defer chain.Close()
defer rpcSrv.Shutdown()
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
require.NoError(t, err)
require.NoError(t, c.Init())
acc, err := wallet.NewAccount()
require.NoError(t, err)
act, err := actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
rcvr := make(chan *state.AppExecResult)
check := func(t *testing.T, b *block.Block, h util.Uint256, vub uint32) {
go func() {
aer, err := act.Wait(h, vub, nil)
require.NoError(t, err, b.Index)
rcvr <- aer
}()
go func() {
require.Eventually(t, func() bool {
rpcSrv.subsLock.Lock()
defer rpcSrv.subsLock.Unlock()
return len(rpcSrv.subscribers) == 1
}, time.Second, 100*time.Millisecond)
require.NoError(t, chain.AddBlock(b))
}()
waitloop:
for {
select {
case aer := <-rcvr:
require.Equal(t, h, aer.Container)
require.Equal(t, trigger.Application, aer.Trigger)
if h.StringLE() == faultedTxHashLE {
require.Equal(t, vmstate.Fault, aer.VMState)
} else {
require.Equal(t, vmstate.Halt, aer.VMState)
}
break waitloop
case <-time.NewTimer(time.Duration(chain.GetConfig().SecondsPerBlock) * time.Second).C:
t.Fatalf("transaction from block %d failed to be awaited: deadline exceeded", b.Index)
}
}
}
var faultedChecked bool
for _, b := range getTestBlocks(t) {
if len(b.Transactions) > 0 {
tx := b.Transactions[0]
check(t, b, tx.Hash(), tx.ValidUntilBlock)
if tx.Hash().StringLE() == faultedTxHashLE {
faultedChecked = true
}
} else {
require.NoError(t, chain.AddBlock(b))
}
}
require.True(t, faultedChecked, "FAULTed transaction wasn't checked")
}
func TestWSClient_WaitWithLateSubscription(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
defer chain.Close()
defer rpcSrv.Shutdown()
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
require.NoError(t, err)
require.NoError(t, c.Init())
acc, err := wallet.NewAccount()
require.NoError(t, err)
act, err := actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
// Firstly, accept the block.
blocks := getTestBlocks(t)
b1 := blocks[0]
b2 := blocks[1]
tx := b1.Transactions[0]
require.NoError(t, chain.AddBlock(b1))
// After that, subscribe for AERs/blocks and wait.
rcvr := make(chan *state.AppExecResult)
go func() {
aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil)
require.NoError(t, err)
rcvr <- aer
}()
// Accept the next block to trigger event-based waiter loop exit and rollback to
// poll-based waiter.
require.NoError(t, chain.AddBlock(b2))
// Wait for the result.
waitloop:
for {
select {
case aer := <-rcvr:
require.Equal(t, tx.Hash(), aer.Container)
require.Equal(t, trigger.Application, aer.Trigger)
require.Equal(t, vmstate.Halt, aer.VMState)
break waitloop
case <-time.NewTimer(time.Duration(chain.GetConfig().SecondsPerBlock) * time.Second).C:
t.Fatal("transaction failed to be awaited")
}
}
}

View file

@ -2429,14 +2429,14 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
case neorpc.ExecutionEventID:
flt := new(neorpc.ExecutionFilter)
err = jd.Decode(flt)
if err == nil && (flt.State != nil && (*flt.State == "HALT" || *flt.State == "FAULT")) {
if err == nil && (flt.State == nil || (*flt.State == "HALT" || *flt.State == "FAULT")) {
filter = *flt
} else if err == nil {
err = errors.New("invalid state")
}
}
if err != nil {
return nil, neorpc.ErrInvalidParams
return nil, neorpc.WrapErrorWithData(neorpc.ErrInvalidParams, err.Error())
}
}

View file

@ -67,20 +67,22 @@ type rpcTestCase struct {
}
const genesisBlockHash = "0f8fb4e17d2ab9f3097af75ca7fd16064160fb8043db94909e00dd4e257b9dc4"
const testContractHash = "2db7d679c538ace5f00495c9e9d8ea95f1e0f5a5"
const deploymentTxHash = "496bccb5cb0a008ef9b7a32c459e508ef24fbb0830f82bac9162afa4ca804839"
const testContractHash = "565cff9508ebc75aadd7fe59f38dac610ab6093c"
const deploymentTxHash = "a14390941cc3a1d87393eff720a722e9cd350bd6ed233c5fe2001326c80eb68e"
const (
verifyContractHash = "06ed5314c2e4cb103029a60b86d46afa2fb8f67c"
verifyContractAVM = "VwIAQS1RCDBwDBTunqIsJ+NL0BSPxBCOCPdOj1BIskrZMCQE2zBxaBPOStkoJATbKGlK2SgkBNsol0A="
verifyWithArgsContractHash = "0dce75f52adb1a4c5c6eaa6a34eb26db2e5b3781"
nnsContractHash = "bdbfe1a280a0e23ca5b569c8f5845169bd93cb06"
nnsToken1ID = "6e656f2e636f6d"
nfsoContractHash = "0e15ca0df00669a2cd5dcb03bfd3e2b3849c2969"
nfsoToken1ID = "7e244ffd6aa85fb1579d2ed22e9b761ab62e3486"
invokescriptContractAVM = "VwIADBQBDAMOBQYMDQIODw0DDgcJAAAAAErZMCQE2zBwaEH4J+yMqiYEEUAMFA0PAwIJAAIBAwcDBAUCAQAOBgwJStkwJATbMHFpQfgn7IyqJgQSQBNA"
block20StateRootLE = "f1380226a217b5e35ea968d42c50e20b9af7ab83b91416c8fb85536c61004332"
storageContractHash = "ebc0c16a76c808cd4dde6bcc063f09e45e331ec7"
verifyContractHash = "06ed5314c2e4cb103029a60b86d46afa2fb8f67c"
verifyContractAVM = "VwIAQS1RCDBwDBTunqIsJ+NL0BSPxBCOCPdOj1BIskrZMCQE2zBxaBPOStkoJATbKGlK2SgkBNsol0A="
verifyWithArgsContractHash = "4dc916254efd2947c93b11207e8ffc0bb56161c5"
nnsContractHash = "892429fcd47c30f8451781acc627e8b20e0d64f3"
nnsToken1ID = "6e656f2e636f6d"
nfsoContractHash = "730ebe719ab8e3b69d11dafc95cdb9bf409db179"
nfsoToken1ID = "7e244ffd6aa85fb1579d2ed22e9b761ab62e3486"
storageContractHash = "ebc0c16a76c808cd4dde6bcc063f09e45e331ec7"
faultedTxHashLE = "82279bfe9bada282ca0f8cb8e0bb124b921af36f00c69a518320322c6f4fef60"
faultedTxBlock uint32 = 23
invokescriptContractAVM = "VwIADBQBDAMOBQYMDQIODw0DDgcJAAAAAErZMCQE2zBwaEH4J+yMqiYEEUAMFA0PAwIJAAIBAwcDBAUCAQAOBgwJStkwJATbMHFpQfgn7IyqJgQSQBNA"
block20StateRootLE = "b49a045246bf3bb90248ed538dd21e67d782a9242c52f31dfdef3da65ecd87c1"
)
var (
@ -287,6 +289,7 @@ var rpcTestCases = map[string][]rpcTestCase{
return &map[string]interface{}{
"name": "neo.com",
"expiration": "lhbLRl0B",
"admin": nil,
}
},
},
@ -817,7 +820,7 @@ var rpcTestCases = map[string][]rpcTestCase{
require.True(t, ok)
expected := result.UnclaimedGas{
Address: testchain.MultisigScriptHash(),
Unclaimed: *big.NewInt(11000),
Unclaimed: *big.NewInt(11500),
}
assert.Equal(t, expected, *actual)
},
@ -905,7 +908,7 @@ var rpcTestCases = map[string][]rpcTestCase{
script = append(script, 0x41, 0x62, 0x7d, 0x5b, 0x52)
return &result.Invoke{
State: "HALT",
GasConsumed: 32414250,
GasConsumed: 31922970,
Script: script,
Stack: []stackitem.Item{stackitem.Make(true)},
Notifications: []state.NotificationEvent{{
@ -935,19 +938,19 @@ var rpcTestCases = map[string][]rpcTestCase{
chg := []dboper.Operation{{
State: "Changed",
Key: []byte{0xfa, 0xff, 0xff, 0xff, 0xb},
Value: []byte{0xf6, 0x8b, 0x4e, 0x9d, 0x51, 0x79, 0x12},
Value: []byte{0x54, 0xb2, 0xd2, 0xa3, 0x51, 0x79, 0x12},
}, {
State: "Added",
Key: []byte{0xfb, 0xff, 0xff, 0xff, 0x14, 0xd6, 0x24, 0x87, 0x12, 0xff, 0x97, 0x22, 0x80, 0xa0, 0xae, 0xf5, 0x24, 0x1c, 0x96, 0x4d, 0x63, 0x78, 0x29, 0xcd, 0xb},
Value: []byte{0x41, 0x03, 0x21, 0x01, 0x01, 0x21, 0x01, 0x17, 0},
Value: []byte{0x41, 0x03, 0x21, 0x01, 0x01, 0x21, 0x01, 0x18, 0},
}, {
State: "Changed",
Key: []byte{0xfb, 0xff, 0xff, 0xff, 0x14, 0xee, 0x9e, 0xa2, 0x2c, 0x27, 0xe3, 0x4b, 0xd0, 0x14, 0x8f, 0xc4, 0x10, 0x8e, 0x8, 0xf7, 0x4e, 0x8f, 0x50, 0x48, 0xb2},
Value: []byte{0x41, 0x03, 0x21, 0x04, 0x2f, 0xd9, 0xf5, 0x05, 0x21, 0x01, 0x17, 0},
Value: []byte{0x41, 0x03, 0x21, 0x04, 0x2f, 0xd9, 0xf5, 0x05, 0x21, 0x01, 0x18, 0},
}, {
State: "Changed",
Key: []byte{0xfa, 0xff, 0xff, 0xff, 0x14, 0xee, 0x9e, 0xa2, 0x2c, 0x27, 0xe3, 0x4b, 0xd0, 0x14, 0x8f, 0xc4, 0x10, 0x8e, 0x8, 0xf7, 0x4e, 0x8f, 0x50, 0x48, 0xb2},
Value: []byte{0x41, 0x01, 0x21, 0x05, 0xe4, 0x74, 0xef, 0xdb, 0x08},
Value: []byte{0x41, 0x01, 0x21, 0x05, 0x0c, 0x76, 0x4f, 0xdf, 0x08},
}}
// Can be returned in any order.
assert.ElementsMatch(t, chg, res.Diagnostics.Changes)
@ -963,7 +966,7 @@ var rpcTestCases = map[string][]rpcTestCase{
cryptoHash, _ := e.chain.GetNativeContractScriptHash(nativenames.CryptoLib)
return &result.Invoke{
State: "HALT",
GasConsumed: 15928320,
GasConsumed: 13970250,
Script: script,
Stack: []stackitem.Item{stackitem.Make("1.2.3.4")},
Notifications: []state.NotificationEvent{},
@ -1052,7 +1055,7 @@ var rpcTestCases = map[string][]rpcTestCase{
script = append(script, 0x41, 0x62, 0x7d, 0x5b, 0x52)
return &result.Invoke{
State: "HALT",
GasConsumed: 32414250,
GasConsumed: 31922970,
Script: script,
Stack: []stackitem.Item{stackitem.Make(true)},
Notifications: []state.NotificationEvent{{
@ -1078,7 +1081,7 @@ var rpcTestCases = map[string][]rpcTestCase{
cryptoHash, _ := e.chain.GetNativeContractScriptHash(nativenames.CryptoLib)
return &result.Invoke{
State: "HALT",
GasConsumed: 15928320,
GasConsumed: 13970250,
Script: script,
Stack: []stackitem.Item{stackitem.Make("1.2.3.4")},
Notifications: []state.NotificationEvent{},
@ -1605,12 +1608,12 @@ var rpcTestCases = map[string][]rpcTestCase{
"sendrawtransaction": {
{
name: "positive",
params: `["ABwAAACWP5gAAAAAAEDaEgAAAAAAFwAAAAHunqIsJ+NL0BSPxBCOCPdOj1BIsoAAXgsDAOh2SBcAAAAMFBEmW7QXJQBBvgTo+iQOOPV8HlabDBTunqIsJ+NL0BSPxBCOCPdOj1BIshTAHwwIdHJhbnNmZXIMFPVj6kC8KD1NDgXEjqMFs/Kgc0DvQWJ9W1IBQgxAEh2U53FB2sU+eeLwTAUqMM5518nsDGil4Oi5IoBiMM7hvl6lKGoYIEaVkf7cS6x4MX1RmSHcoOabKFTyuEXI3SgMIQKzYiv0AXvf4xfFiu1fTHU/IGt9uJYEb6fXdLvEv3+NwkFW57Mn"]`,
params: `["AB0AAACWP5gAAAAAAEDaEgAAAAAAGAAAAAHunqIsJ+NL0BSPxBCOCPdOj1BIsoAAXgsDAOh2SBcAAAAMFBEmW7QXJQBBvgTo+iQOOPV8HlabDBTunqIsJ+NL0BSPxBCOCPdOj1BIshTAHwwIdHJhbnNmZXIMFPVj6kC8KD1NDgXEjqMFs/Kgc0DvQWJ9W1IBQgxAJ6norhWoZxp+Hj1JFhi+Z3qI9DUkLSbfsbaLSaJIqxTfdmPbNFDVK1G+oa+LWmpRp/bj9+QZM7yC+S6HXUI7rigMIQKzYiv0AXvf4xfFiu1fTHU/IGt9uJYEb6fXdLvEv3+NwkFW57Mn"]`,
result: func(e *executor) interface{} { return &result.RelayResult{} },
check: func(t *testing.T, e *executor, inv interface{}) {
res, ok := inv.(*result.RelayResult)
require.True(t, ok)
expectedHash := "e4418a8bdad8cdf401aabb277c7bec279d0b0113812c09607039c4ad87204d90"
expectedHash := "c11861dec1dd0f188608b725095041fcfc90abe51eea044993f122f22472753e"
assert.Equal(t, expectedHash, res.Hash.StringLE())
},
},
@ -2232,7 +2235,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
require.NoErrorf(t, err, "could not parse response: %s", txOut)
assert.Equal(t, *block.Transactions[0], actual.Transaction)
assert.Equal(t, 23, actual.Confirmations)
assert.Equal(t, 24, actual.Confirmations)
assert.Equal(t, TXHash, actual.Transaction.Hash())
})
@ -2345,12 +2348,12 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
require.NoError(t, json.Unmarshal(res, actual))
checkNep17TransfersAux(t, e, actual, sent, rcvd)
}
t.Run("time frame only", func(t *testing.T) { testNEP17T(t, 4, 5, 0, 0, []int{18, 19, 20, 21}, []int{3, 4}) })
t.Run("time frame only", func(t *testing.T) { testNEP17T(t, 4, 5, 0, 0, []int{19, 20, 21, 22}, []int{3, 4}) })
t.Run("no res", func(t *testing.T) { testNEP17T(t, 100, 100, 0, 0, []int{}, []int{}) })
t.Run("limit", func(t *testing.T) { testNEP17T(t, 1, 7, 3, 0, []int{15, 16}, []int{2}) })
t.Run("limit 2", func(t *testing.T) { testNEP17T(t, 4, 5, 2, 0, []int{18}, []int{3}) })
t.Run("limit with page", func(t *testing.T) { testNEP17T(t, 1, 7, 3, 1, []int{17, 18}, []int{3}) })
t.Run("limit with page 2", func(t *testing.T) { testNEP17T(t, 1, 7, 3, 2, []int{19, 20}, []int{4}) })
t.Run("limit", func(t *testing.T) { testNEP17T(t, 1, 7, 3, 0, []int{16, 17}, []int{2}) })
t.Run("limit 2", func(t *testing.T) { testNEP17T(t, 4, 5, 2, 0, []int{19}, []int{3}) })
t.Run("limit with page", func(t *testing.T) { testNEP17T(t, 1, 7, 3, 1, []int{18, 19}, []int{3}) })
t.Run("limit with page 2", func(t *testing.T) { testNEP17T(t, 1, 7, 3, 2, []int{20, 21}, []int{4}) })
})
prepareIteratorSession := func(t *testing.T) (uuid.UUID, uuid.UUID) {
@ -2557,7 +2560,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
t.Run("contract-based verification with parameters", func(t *testing.T) {
verAcc, err := util.Uint160DecodeStringLE(verifyWithArgsContractHash)
require.NoError(t, err)
checkContract(t, verAcc, []byte{}, 490890) // No C# match, but we believe it's OK and it differs from the one above.
checkContract(t, verAcc, []byte{}, 245250) // No C# match, but we believe it's OK and it differs from the one above.
})
t.Run("contract-based verification with invocation script", func(t *testing.T) {
verAcc, err := util.Uint160DecodeStringLE(verifyWithArgsContractHash)
@ -2567,7 +2570,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
emit.Int(invocWriter.BinWriter, 5)
emit.String(invocWriter.BinWriter, "")
invocScript := invocWriter.Bytes()
checkContract(t, verAcc, invocScript, 393720) // No C# match, but we believe it's OK and it has a specific invocation script overriding anything server-side.
checkContract(t, verAcc, invocScript, 148080) // No C# match, but we believe it's OK and it has a specific invocation script overriding anything server-side.
})
})
}
@ -2719,8 +2722,8 @@ func checkNep17Balances(t *testing.T, e *executor, acc interface{}) {
},
{
Asset: e.chain.UtilityTokenHash(),
Amount: "37099660700",
LastUpdated: 22,
Amount: "37106285100",
LastUpdated: 23,
Decimals: 8,
Name: "GasToken",
Symbol: "GAS",
@ -2834,7 +2837,7 @@ func checkNep11TransfersAux(t *testing.T, e *executor, acc interface{}, sent, rc
}
func checkNep17Transfers(t *testing.T, e *executor, acc interface{}) {
checkNep17TransfersAux(t, e, acc, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}, []int{0, 1, 2, 3, 4, 5, 6, 7, 8})
checkNep17TransfersAux(t, e, acc, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24}, []int{0, 1, 2, 3, 4, 5, 6, 7, 8})
}
func checkNep17TransfersAux(t *testing.T, e *executor, acc interface{}, sent, rcvd []int) {
@ -2843,6 +2846,11 @@ func checkNep17TransfersAux(t *testing.T, e *executor, acc interface{}, sent, rc
rublesHash, err := util.Uint160DecodeStringLE(testContractHash)
require.NoError(t, err)
blockWithFAULTedTx, err := e.chain.GetBlock(e.chain.GetHeaderHash(int(faultedTxBlock))) // Transaction with ABORT inside.
require.NoError(t, err)
require.Equal(t, 1, len(blockWithFAULTedTx.Transactions))
txFAULTed := blockWithFAULTedTx.Transactions[0]
blockDeploy6, err := e.chain.GetBlock(e.chain.GetHeaderHash(22)) // deploy Storage contract (storage_contract.go)
require.NoError(t, err)
require.Equal(t, 1, len(blockDeploy6.Transactions))
@ -2947,6 +2955,14 @@ func checkNep17TransfersAux(t *testing.T, e *executor, acc interface{}, sent, rc
// duplicate the Server method.
expected := result.NEP17Transfers{
Sent: []result.NEP17Transfer{
{
Timestamp: blockWithFAULTedTx.Timestamp,
Asset: e.chain.UtilityTokenHash(),
Address: "", // burn
Amount: big.NewInt(txFAULTed.SystemFee + txFAULTed.NetworkFee).String(),
Index: 23,
TxHash: blockWithFAULTedTx.Hash(),
},
{
Timestamp: blockDeploy6.Timestamp,
Asset: e.chain.UtilityTokenHash(),

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
@ -228,7 +229,7 @@ func TestFilteredSubscriptions(t *testing.T) {
require.Equal(t, "my_pretty_notification", n)
},
},
"execution matching": {
"execution matching state": {
params: `["transaction_executed", {"state":"HALT"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
@ -237,6 +238,26 @@ func TestFilteredSubscriptions(t *testing.T) {
require.Equal(t, "HALT", st)
},
},
"execution matching container": {
params: `["transaction_executed", {"container":"` + deploymentTxHash + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
tx := rmap["container"].(string)
require.Equal(t, "0x"+deploymentTxHash, tx)
},
},
"execution matching state and container": {
params: `["transaction_executed", {"state":"HALT", "container":"` + deploymentTxHash + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
tx := rmap["container"].(string)
require.Equal(t, "0x"+deploymentTxHash, tx)
st := rmap["vmstate"].(string)
require.Equal(t, "HALT", st)
},
},
"tx non-matching": {
params: `["transaction_added", {"sender":"00112233445566778899aabbccddeeff00112233"}]`,
check: func(t *testing.T, _ *neorpc.Notification) {
@ -250,8 +271,9 @@ func TestFilteredSubscriptions(t *testing.T) {
},
},
"execution non-matching": {
params: `["transaction_executed", {"state":"FAULT"}]`,
check: func(t *testing.T, _ *neorpc.Notification) {
// We have single FAULTed transaction in chain, this, use the wrong hash for this test instead of FAULT state.
params: `["transaction_executed", {"container":"0x` + util.Uint256{}.StringLE() + `"}]`,
check: func(t *testing.T, n *neorpc.Notification) {
t.Fatal("unexpected match for faulted execution")
},
},

Binary file not shown.