f5b1bd3978
Try to subscribe for headers firstly, and then if RPC server doesn't have this ability, fallback to block subscriptions to manage transaction awaiting. Close #3260. Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
266 lines
7.4 KiB
Go
266 lines
7.4 KiB
Go
package waiter_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter"
|
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type RPCClient struct {
|
|
err error
|
|
invRes *result.Invoke
|
|
netFee int64
|
|
bCount atomic.Uint32
|
|
version *result.Version
|
|
hash util.Uint256
|
|
appLog *result.ApplicationLog
|
|
context context.Context
|
|
}
|
|
|
|
var _ = waiter.RPCPollingBased(&RPCClient{})
|
|
|
|
func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) {
|
|
return r.invRes, r.err
|
|
}
|
|
func (r *RPCClient) InvokeFunction(contract util.Uint160, operation string, params []smartcontract.Parameter, signers []transaction.Signer) (*result.Invoke, error) {
|
|
return r.invRes, r.err
|
|
}
|
|
func (r *RPCClient) InvokeScript(script []byte, signers []transaction.Signer) (*result.Invoke, error) {
|
|
return r.invRes, r.err
|
|
}
|
|
func (r *RPCClient) CalculateNetworkFee(tx *transaction.Transaction) (int64, error) {
|
|
return r.netFee, r.err
|
|
}
|
|
func (r *RPCClient) GetBlockCount() (uint32, error) {
|
|
return r.bCount.Load(), r.err
|
|
}
|
|
func (r *RPCClient) GetVersion() (*result.Version, error) {
|
|
verCopy := *r.version
|
|
return &verCopy, r.err
|
|
}
|
|
func (r *RPCClient) SendRawTransaction(tx *transaction.Transaction) (util.Uint256, error) {
|
|
return r.hash, r.err
|
|
}
|
|
func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) {
|
|
return false, nil // Just a stub, unused by actor.
|
|
}
|
|
func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) {
|
|
return nil, nil // Just a stub, unused by actor.
|
|
}
|
|
func (r *RPCClient) Context() context.Context {
|
|
if r.context == nil {
|
|
return context.Background()
|
|
}
|
|
return r.context
|
|
}
|
|
|
|
func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
|
|
if r.appLog != nil {
|
|
return r.appLog, nil
|
|
}
|
|
return nil, errors.New("not found")
|
|
}
|
|
|
|
type AwaitableRPCClient struct {
|
|
RPCClient
|
|
|
|
chLock sync.RWMutex
|
|
subHeaderCh chan<- *block.Header
|
|
subBlockCh chan<- *block.Block
|
|
subTxCh chan<- *state.AppExecResult
|
|
}
|
|
|
|
var _ = waiter.RPCEventBased(&AwaitableRPCClient{})
|
|
|
|
func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
|
|
c.chLock.Lock()
|
|
defer c.chLock.Unlock()
|
|
c.subBlockCh = rcvr
|
|
return "1", nil
|
|
}
|
|
func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
|
|
c.chLock.Lock()
|
|
defer c.chLock.Unlock()
|
|
c.subTxCh = rcvr
|
|
return "2", nil
|
|
}
|
|
func (c *AwaitableRPCClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
|
|
c.chLock.Lock()
|
|
defer c.chLock.Unlock()
|
|
c.subHeaderCh = rcvr
|
|
return "3", nil
|
|
}
|
|
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
|
|
|
|
func TestNewWaiter(t *testing.T) {
|
|
w := waiter.New((actor.RPCActor)(nil), nil)
|
|
_, ok := w.(waiter.Null)
|
|
require.True(t, ok)
|
|
|
|
w = waiter.New(&RPCClient{}, &result.Version{})
|
|
_, ok = w.(*waiter.PollingBased)
|
|
require.True(t, ok)
|
|
|
|
w = waiter.New(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{})
|
|
_, ok = w.(*waiter.EventBased)
|
|
require.True(t, ok)
|
|
}
|
|
|
|
func TestPollingWaiter_Wait(t *testing.T) {
|
|
h := util.Uint256{1, 2, 3}
|
|
bCount := uint32(5)
|
|
appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}}
|
|
expected := &state.AppExecResult{Container: h, Execution: state.Execution{}}
|
|
c := &RPCClient{appLog: appLog}
|
|
c.bCount.Store(bCount)
|
|
w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
|
|
_, ok := w.(*waiter.PollingBased)
|
|
require.True(t, ok)
|
|
|
|
// Wait with error.
|
|
someErr := errors.New("some error")
|
|
_, err := w.Wait(h, bCount, someErr)
|
|
require.ErrorIs(t, err, someErr)
|
|
|
|
// AER is in chain immediately.
|
|
aer, err := w.Wait(h, bCount-1, nil)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected, aer)
|
|
|
|
// Missing AER after VUB.
|
|
c.appLog = nil
|
|
_, err = w.Wait(h, bCount-2, nil)
|
|
require.ErrorIs(t, waiter.ErrTxNotAccepted, err)
|
|
|
|
checkErr := func(t *testing.T, trigger func(), target error) {
|
|
errCh := make(chan error)
|
|
go func() {
|
|
_, err = w.Wait(h, bCount, nil)
|
|
errCh <- err
|
|
}()
|
|
timer := time.NewTimer(time.Second)
|
|
var triggerFired bool
|
|
waitloop:
|
|
for {
|
|
select {
|
|
case err = <-errCh:
|
|
require.ErrorIs(t, err, target)
|
|
break waitloop
|
|
case <-timer.C:
|
|
if triggerFired {
|
|
t.Fatal("failed to await result")
|
|
}
|
|
trigger()
|
|
triggerFired = true
|
|
timer.Reset(time.Second * 2)
|
|
}
|
|
}
|
|
require.True(t, triggerFired)
|
|
}
|
|
|
|
// Tx is accepted before VUB.
|
|
c.appLog = nil
|
|
c.bCount.Store(bCount)
|
|
checkErr(t, func() { c.bCount.Store(bCount + 1) }, waiter.ErrTxNotAccepted)
|
|
|
|
// Context is cancelled.
|
|
c.appLog = nil
|
|
c.bCount.Store(bCount)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.context = ctx
|
|
checkErr(t, cancel, waiter.ErrContextDone)
|
|
}
|
|
|
|
func TestWSWaiter_Wait(t *testing.T) {
|
|
h := util.Uint256{1, 2, 3}
|
|
bCount := uint32(5)
|
|
appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}}
|
|
expected := &state.AppExecResult{Container: h, Execution: state.Execution{}}
|
|
c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}}
|
|
c.bCount.Store(bCount)
|
|
w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
|
|
_, ok := w.(*waiter.EventBased)
|
|
require.True(t, ok)
|
|
|
|
// Wait with error.
|
|
someErr := errors.New("some error")
|
|
_, err := w.Wait(h, bCount, someErr)
|
|
require.ErrorIs(t, err, someErr)
|
|
|
|
// AER is in chain immediately.
|
|
aer, err := w.Wait(h, bCount-1, nil)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected, aer)
|
|
|
|
// Auxiliary things for asynchronous tests.
|
|
doneCh := make(chan struct{})
|
|
check := func(t *testing.T, trigger func()) {
|
|
timer := time.NewTimer(time.Second)
|
|
var triggerFired bool
|
|
waitloop:
|
|
for {
|
|
select {
|
|
case <-doneCh:
|
|
break waitloop
|
|
case <-timer.C:
|
|
if triggerFired {
|
|
t.Fatal("failed to await result")
|
|
}
|
|
trigger()
|
|
triggerFired = true
|
|
timer.Reset(time.Second * 2)
|
|
}
|
|
}
|
|
require.True(t, triggerFired)
|
|
}
|
|
|
|
// AER received after the subscription.
|
|
c.RPCClient.appLog = nil
|
|
go func() {
|
|
aer, err = w.Wait(h, bCount-1, nil)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected, aer)
|
|
doneCh <- struct{}{}
|
|
}()
|
|
check(t, func() {
|
|
c.chLock.RLock()
|
|
defer c.chLock.RUnlock()
|
|
c.subTxCh <- expected
|
|
})
|
|
|
|
// Missing AER after VUB.
|
|
go func() {
|
|
_, err = w.Wait(h, bCount-2, nil)
|
|
require.ErrorIs(t, err, waiter.ErrTxNotAccepted)
|
|
doneCh <- struct{}{}
|
|
}()
|
|
check(t, func() {
|
|
c.chLock.RLock()
|
|
defer c.chLock.RUnlock()
|
|
c.subHeaderCh <- &block.Header{}
|
|
})
|
|
}
|
|
|
|
func TestRPCWaiterRPCClientCompat(t *testing.T) {
|
|
_ = waiter.RPCPollingBased(&rpcclient.Client{})
|
|
_ = waiter.RPCPollingBased(&rpcclient.WSClient{})
|
|
_ = waiter.RPCEventBased(&rpcclient.WSClient{})
|
|
}
|