diff --git a/pkg/rpcclient/actor/actor_test.go b/pkg/rpcclient/actor/actor_test.go index 895d54743..bb49320df 100644 --- a/pkg/rpcclient/actor/actor_test.go +++ b/pkg/rpcclient/actor/actor_test.go @@ -15,15 +15,18 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) type RPCClient struct { err error invRes *result.Invoke netFee int64 - bCount uint32 + bCount atomic.Uint32 version *result.Version hash util.Uint256 + appLog *result.ApplicationLog + context context.Context } func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { @@ -39,7 +42,7 @@ func (r *RPCClient) CalculateNetworkFee(tx *transaction.Transaction) (int64, err return r.netFee, r.err } func (r *RPCClient) GetBlockCount() (uint32, error) { - return r.bCount, r.err + return r.bCount.Load(), r.err } func (r *RPCClient) GetVersion() (*result.Version, error) { verCopy := *r.version @@ -55,10 +58,17 @@ func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCo return nil, nil // Just a stub, unused by actor. } func (r *RPCClient) Context() context.Context { - panic("TODO") + if r.context == nil { + return context.Background() + } + return r.context } + func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { - panic("TODO") + if r.appLog != nil { + return r.appLog, nil + } + return nil, errors.New("not found") } func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) { client := &RPCClient{ @@ -172,7 +182,7 @@ func TestSimpleWrappers(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(42), nf) - client.bCount = 100500 + client.bCount.Store(100500) bc, err := a.GetBlockCount() require.NoError(t, err) require.Equal(t, uint32(100500), bc) diff --git a/pkg/rpcclient/actor/maker_test.go b/pkg/rpcclient/actor/maker_test.go index b7f66c7b4..1b17510e8 100644 --- a/pkg/rpcclient/actor/maker_test.go +++ b/pkg/rpcclient/actor/maker_test.go @@ -20,7 +20,7 @@ func TestCalculateValidUntilBlock(t *testing.T) { require.Error(t, err) client.err = nil - client.bCount = 42 + client.bCount.Store(42) vub, err := a.CalculateValidUntilBlock() require.NoError(t, err) require.Equal(t, uint32(42+7+1), vub) @@ -37,7 +37,7 @@ func TestCalculateValidUntilBlock(t *testing.T) { require.NoError(t, err) require.Equal(t, uint32(42+4+1), vub) - client.bCount = 101 + client.bCount.Store(101) vub, err = a.CalculateValidUntilBlock() require.NoError(t, err) require.Equal(t, uint32(101+10+1), vub) @@ -64,7 +64,7 @@ func TestMakeUnsigned(t *testing.T) { // Good unchecked. client.netFee = 42 - client.bCount = 100500 + client.bCount.Store(100500) client.err = nil tx, err := a.MakeUnsignedUncheckedRun(script, 1, nil) require.NoError(t, err) diff --git a/pkg/rpcclient/actor/waiter_test.go b/pkg/rpcclient/actor/waiter_test.go new file mode 100644 index 000000000..03a516ce4 --- /dev/null +++ b/pkg/rpcclient/actor/waiter_test.go @@ -0,0 +1,186 @@ +package actor + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/stretchr/testify/require" +) + +type AwaitableRPCClient struct { + RPCClient + + chLock sync.RWMutex + subBlockCh chan<- rpcclient.Notification + subTxCh chan<- rpcclient.Notification +} + +func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) { + c.chLock.Lock() + defer c.chLock.Unlock() + c.subBlockCh = rcvrCh + return "1", nil +} +func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) { + c.chLock.Lock() + defer c.chLock.Unlock() + c.subTxCh = rcvrCh + return "2", nil +} +func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } + +func TestNewWaiter(t *testing.T) { + w := newWaiter((RPCActor)(nil), nil) + _, ok := w.(NullWaiter) + require.True(t, ok) + + w = newWaiter(&RPCClient{}, &result.Version{}) + _, ok = w.(*PollingWaiter) + require.True(t, ok) + + w = newWaiter(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{}) + _, ok = w.(*EventWaiter) + require.True(t, ok) +} + +func TestPollingWaiter_Wait(t *testing.T) { + h := util.Uint256{1, 2, 3} + bCount := uint32(5) + appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}} + expected := &state.AppExecResult{Container: h, Execution: state.Execution{}} + c := &RPCClient{appLog: appLog} + c.bCount.Store(bCount) + w := newWaiter(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time. + _, ok := w.(*PollingWaiter) + require.True(t, ok) + + // Wait with error. + someErr := errors.New("some error") + _, err := w.Wait(h, bCount, someErr) + require.ErrorIs(t, err, someErr) + + // AER is in chain immediately. + aer, err := w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + + // Missing AER after VUB. + c.appLog = nil + _, err = w.Wait(h, bCount-2, nil) + require.ErrorIs(t, ErrTxNotAccepted, err) + + checkErr := func(t *testing.T, trigger func(), target error) { + errCh := make(chan error) + go func() { + _, err = w.Wait(h, bCount, nil) + errCh <- err + }() + timer := time.NewTimer(time.Second) + var triggerFired bool + waitloop: + for { + select { + case err = <-errCh: + require.ErrorIs(t, err, target) + break waitloop + case <-timer.C: + if triggerFired { + t.Fatal("failed to await result") + } + trigger() + triggerFired = true + timer.Reset(time.Second * 2) + } + } + require.True(t, triggerFired) + } + + // Tx is accepted before VUB. + c.appLog = nil + c.bCount.Store(bCount) + checkErr(t, func() { c.bCount.Store(bCount + 1) }, ErrTxNotAccepted) + + // Context is cancelled. + c.appLog = nil + c.bCount.Store(bCount) + ctx, cancel := context.WithCancel(context.Background()) + c.context = ctx + checkErr(t, cancel, ErrContextDone) +} + +func TestWSWaiter_Wait(t *testing.T) { + h := util.Uint256{1, 2, 3} + bCount := uint32(5) + appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}} + expected := &state.AppExecResult{Container: h, Execution: state.Execution{}} + c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}} + c.bCount.Store(bCount) + w := newWaiter(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time. + _, ok := w.(*EventWaiter) + require.True(t, ok) + + // Wait with error. + someErr := errors.New("some error") + _, err := w.Wait(h, bCount, someErr) + require.ErrorIs(t, err, someErr) + + // AER is in chain immediately. + doneCh := make(chan struct{}) + go func() { + aer, err := w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + doneCh <- struct{}{} + }() + check := func(t *testing.T, trigger func()) { + timer := time.NewTimer(time.Second) + var triggerFired bool + waitloop: + for { + select { + case <-doneCh: + break waitloop + case <-timer.C: + if triggerFired { + t.Fatal("failed to await result") + } + trigger() + triggerFired = true + timer.Reset(time.Second * 2) + } + } + require.True(t, triggerFired) + } + check(t, func() { + c.chLock.RLock() + defer c.chLock.RUnlock() + c.subBlockCh <- rpcclient.Notification{ + Type: neorpc.ExecutionEventID, + Value: expected, + } + }) + + // Missing AER after VUB. + go func() { + _, err = w.Wait(h, bCount-2, nil) + require.ErrorIs(t, err, ErrTxNotAccepted) + doneCh <- struct{}{} + }() + check(t, func() { + c.chLock.RLock() + defer c.chLock.RUnlock() + c.subBlockCh <- rpcclient.Notification{ + Type: neorpc.BlockEventID, + Value: &block.Block{}, + } + }) +}