diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 7d602aaaa..ceb722a9a 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/nspcc-dev/neo-go/pkg/core/block" @@ -41,7 +42,11 @@ type ( // Wait allows to wait until transaction will be accepted to the chain. It can be // used as a wrapper for Send or SignAndSend and accepts transaction hash, // ValidUntilBlock value and an error. It returns transaction execution result - // or an error if transaction wasn't accepted to the chain. + // or an error if transaction wasn't accepted to the chain. Notice that "already + // exists" err value is not treated as an error by this routine because it + // means that the transactions given might be already accepted or soon going + // to be accepted. Such transaction can be waited for in a usual way, potentially + // with positive result, so that's what will happen. Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) // WaitAny waits until at least one of the specified transactions will be accepted // to the chain until vub (including). It returns execution result of this @@ -89,6 +94,12 @@ type EventWaiter struct { polling Waiter } +// errIsAlreadyExists is a temporary helper until we have #2248 solved. Both C# +// and Go nodes return this string (possibly among other data). +func errIsAlreadyExists(err error) bool { + return strings.Contains(strings.ToLower(err.Error()), "already exists") +} + // newWaiter creates Waiter instance. It can be either websocket-based or // polling-base, otherwise Waiter stub is returned. func newWaiter(ra RPCActor, v *result.Version) Waiter { @@ -139,7 +150,7 @@ func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { // Wait implements Waiter interface. func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { - if err != nil { + if err != nil && !errIsAlreadyExists(err) { return nil, err } return w.WaitAny(context.TODO(), vub, h) @@ -209,7 +220,7 @@ func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) { // Wait implements Waiter interface. func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) { - if err != nil { + if err != nil && !errIsAlreadyExists(err) { return nil, err } return w.WaitAny(context.TODO(), vub, h) @@ -244,6 +255,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui }() } if wsWaitErr == nil { + trig := trigger.Application for _, h := range hashes { txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr) if err != nil { @@ -260,22 +272,28 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } unsubErrs <- nil }() + // There is a potential race between subscription and acceptance, so + // do a polling check once _after_ the subscription. + appLog, err := w.ws.GetApplicationLog(h, &trig) + if err == nil { + res = &state.AppExecResult{ + Container: appLog.Container, + Execution: appLog.Executions[0], + } + break // We have the result, no need for other subscriptions. + } } } - if wsWaitErr == nil { + if wsWaitErr == nil && res == nil { select { - case b, ok := <-bRcvr: + case _, ok := <-bRcvr: if !ok { // We're toast, retry with non-ws client. wsWaitErr = ErrMissedEvent break } - // We can easily end up in a situation when subscription was performed too late and - // the desired transaction and VUB-th block have already got accepted before the - // subscription happened. Thus, always retry with non-ws client, it will perform - // AER requests and make sure. - wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index) + waitErr = ErrTxNotAccepted case aer, ok := <-aerRcvr: if !ok { // We're toast, retry with non-ws client. diff --git a/pkg/rpcclient/actor/waiter_test.go b/pkg/rpcclient/actor/waiter_test.go index a3fcac31f..f6b06fb00 100644 --- a/pkg/rpcclient/actor/waiter_test.go +++ b/pkg/rpcclient/actor/waiter_test.go @@ -133,13 +133,12 @@ func TestWSWaiter_Wait(t *testing.T) { require.ErrorIs(t, err, someErr) // AER is in chain immediately. + aer, err := w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + + // Auxiliary things for asynchronous tests. doneCh := make(chan struct{}) - go func() { - aer, err := w.Wait(h, bCount-1, nil) - require.NoError(t, err) - require.Equal(t, expected, aer) - doneCh <- struct{}{} - }() check := func(t *testing.T, trigger func()) { timer := time.NewTimer(time.Second) var triggerFired bool @@ -159,6 +158,15 @@ func TestWSWaiter_Wait(t *testing.T) { } require.True(t, triggerFired) } + + // AER received after the subscription. + c.RPCClient.appLog = nil + go func() { + aer, err = w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + doneCh <- struct{}{} + }() check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() @@ -166,7 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) { }) // Missing AER after VUB. - c.RPCClient.appLog = nil go func() { _, err = w.Wait(h, bCount-2, nil) require.ErrorIs(t, err, ErrTxNotAccepted) diff --git a/pkg/rpcclient/notary/actor.go b/pkg/rpcclient/notary/actor.go index 2a71f5750..925ef9417 100644 --- a/pkg/rpcclient/notary/actor.go +++ b/pkg/rpcclient/notary/actor.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -320,9 +321,13 @@ func (a *Actor) SendRequestExactly(mainTx *transaction.Transaction, fbTx *transa // the resulting application execution result or actor.ErrTxNotAccepted if both transactions // failed to persist. Wait can be used if underlying Actor supports transaction awaiting, // see actor.Actor and actor.Waiter documentation for details. Wait may be used as a wrapper -// for Notarize, SendRequest or SendRequestExactly. +// for Notarize, SendRequest or SendRequestExactly. Notice that "already exists" or "already +// on chain" answers are not treated as errors by this routine because they mean that some +// of the transactions given might be already accepted or soon going to be accepted. These +// transactions can be waited for in a usual way potentially with positive result. func (a *Actor) Wait(mainHash, fbHash util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { - if err != nil { + // #2248 will eventually remove this garbage from the code. + if err != nil && !(strings.Contains(strings.ToLower(err.Error()), "already exists") || strings.Contains(strings.ToLower(err.Error()), "already on chain")) { return nil, err } return a.WaitAny(context.TODO(), vub, mainHash, fbHash) diff --git a/pkg/rpcclient/rpc.go b/pkg/rpcclient/rpc.go index 112689bea..0e7147baa 100644 --- a/pkg/rpcclient/rpc.go +++ b/pkg/rpcclient/rpc.go @@ -692,17 +692,16 @@ func (c *Client) invokeSomething(method string, p []interface{}, signers []trans return resp, nil } -// SendRawTransaction broadcasts a transaction over the NEO network. -// The given hex string needs to be signed with a keypair. -// When the result of the response object is true, the TX has successfully -// been broadcasted to the network. +// SendRawTransaction broadcasts the given transaction to the Neo network. +// It always returns transaction hash, when successful (no error) this is the +// hash returned from server, when not it's a locally calculated rawTX hash. func (c *Client) SendRawTransaction(rawTX *transaction.Transaction) (util.Uint256, error) { var ( params = []interface{}{rawTX.Bytes()} resp = new(result.RelayResult) ) if err := c.performRequest("sendrawtransaction", params, resp); err != nil { - return util.Uint256{}, err + return rawTX.Hash(), err } return resp.Hash, nil } diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 023af25d0..093b752b8 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "encoding/hex" + "encoding/json" "fmt" "math/big" "net/http" @@ -16,6 +17,7 @@ import ( "time" "github.com/google/uuid" + "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/internal/testchain" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core" @@ -2132,11 +2134,42 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { // Firstly, accept the block. blocks := getTestBlocks(t) b1 := blocks[0] - b2 := blocks[1] tx := b1.Transactions[0] require.NoError(t, chain.AddBlock(b1)) - // After that, subscribe for AERs/blocks. + // After that, wait and get the result immediately. + aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil) + require.NoError(t, err) + require.Equal(t, tx.Hash(), aer.Container) + require.Equal(t, trigger.Application, aer.Trigger) + require.Equal(t, vmstate.Halt, aer.VMState) +} + +func TestWSClient_WaitWithMissedEvent(t *testing.T) { + chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true) + defer chain.Close() + defer rpcSrv.Shutdown() + + url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" + c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + acc, err := wallet.NewAccount() + require.NoError(t, err) + act, err := actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + + blocks := getTestBlocks(t) + b1 := blocks[0] + tx := b1.Transactions[0] + rcvr := make(chan *state.AppExecResult) go func() { aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil) @@ -2150,23 +2183,33 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { require.Eventually(t, func() bool { rpcSrv.subsLock.Lock() defer rpcSrv.subsLock.Unlock() - if len(rpcSrv.subscribers) == 1 { // single client - for s := range rpcSrv.subscribers { - var count int - for _, f := range s.feeds { - if f.event != neorpc.InvalidEventID { - count++ - } - } - return count == 2 // subscription for blocks + AERs - } - } - return false + return len(rpcSrv.subscribers) == 1 }, time.Second, 100*time.Millisecond) - // Accept the next block to trigger event-based waiter loop exit and rollback to - // a poll-based waiter. - require.NoError(t, chain.AddBlock(b2)) + rpcSrv.subsLock.Lock() + // Suppress normal event delivery. + for s := range rpcSrv.subscribers { + s.overflown.Store(true) + } + rpcSrv.subsLock.Unlock() + + // Accept the next block, but subscriber will get no events because it's overflown. + require.NoError(t, chain.AddBlock(b1)) + + overEvent, err := json.Marshal(neorpc.Notification{ + JSONRPC: neorpc.JSONRPCVersion, + Event: neorpc.MissedEventID, + Payload: make([]interface{}, 0), + }) + require.NoError(t, err) + overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent) + require.NoError(t, err) + rpcSrv.subsLock.Lock() + // Deliver overflow message -> triggers subscriber to retry with polling waiter. + for s := range rpcSrv.subscribers { + s.writer <- overflowMsg + } + rpcSrv.subsLock.Unlock() // Wait for the result. waitloop: