diff --git a/pkg/neorpc/result/invoke.go b/pkg/neorpc/result/invoke.go index ef579f600..2fa32f4e4 100644 --- a/pkg/neorpc/result/invoke.go +++ b/pkg/neorpc/result/invoke.go @@ -232,3 +232,20 @@ func (r *Invoke) UnmarshalJSON(data []byte) error { r.Diagnostics = aux.Diagnostics return nil } + +// AppExecToInvocation converts state.AppExecResult to result.Invoke and can be used +// as a wrapper for actor.Wait. The result of AppExecToInvocation doesn't have all fields +// properly filled, it's limited by State, GasConsumed, Stack, FaultException and Notifications. +// The result of AppExecToInvocation can be passed to unwrap package helpers. +func AppExecToInvocation(aer *state.AppExecResult, err error) (*Invoke, error) { + if err != nil { + return nil, err + } + return &Invoke{ + State: aer.VMState.String(), + GasConsumed: aer.GasConsumed, + Stack: aer.Stack, + FaultException: aer.FaultException, + Notifications: aer.Events, + }, nil +} diff --git a/pkg/neorpc/result/invoke_test.go b/pkg/neorpc/result/invoke_test.go index 09dc588dc..64a145740 100644 --- a/pkg/neorpc/result/invoke_test.go +++ b/pkg/neorpc/result/invoke_test.go @@ -3,13 +3,17 @@ package result import ( "encoding/base64" "encoding/json" + "errors" "math/big" "testing" + "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/stretchr/testify/require" ) @@ -49,3 +53,39 @@ func TestInvoke_MarshalJSON(t *testing.T) { require.NoError(t, json.Unmarshal(data, actual)) require.Equal(t, result, actual) } + +func TestAppExecToInvocation(t *testing.T) { + // With error. + someErr := errors.New("some err") + _, err := AppExecToInvocation(nil, someErr) + require.ErrorIs(t, err, someErr) + + // Good. + h := util.Uint256{1, 2, 3} + ex := state.Execution{ + Trigger: trigger.Application, + VMState: vmstate.Fault, + GasConsumed: 123, + Stack: []stackitem.Item{stackitem.NewBigInteger(big.NewInt(123))}, + Events: []state.NotificationEvent{{ + ScriptHash: util.Uint160{3, 2, 1}, + Name: "Notification", + Item: stackitem.NewArray([]stackitem.Item{stackitem.Null{}}), + }}, + FaultException: "some fault exception", + } + inv, err := AppExecToInvocation(&state.AppExecResult{ + Container: h, + Execution: ex, + }, nil) + require.NoError(t, err) + require.Equal(t, ex.VMState.String(), inv.State) + require.Equal(t, ex.GasConsumed, inv.GasConsumed) + require.Nil(t, inv.Script) + require.Equal(t, ex.Stack, inv.Stack) + require.Equal(t, ex.FaultException, inv.FaultException) + require.Equal(t, ex.Events, inv.Notifications) + require.Nil(t, inv.Transaction) + require.Nil(t, inv.Diagnostics) + require.Equal(t, uuid.UUID{}, inv.Session) +} diff --git a/pkg/rpcclient/actor/actor.go b/pkg/rpcclient/actor/actor.go index 59b0ad3a7..cab05e218 100644 --- a/pkg/rpcclient/actor/actor.go +++ b/pkg/rpcclient/actor/actor.go @@ -25,6 +25,7 @@ import ( // create and send transactions. type RPCActor interface { invoker.RPCInvoke + RPCPollingWaiter CalculateNetworkFee(tx *transaction.Transaction) (int64, error) GetBlockCount() (uint32, error) diff --git a/pkg/rpcclient/actor/actor_test.go b/pkg/rpcclient/actor/actor_test.go index d621d2770..895d54743 100644 --- a/pkg/rpcclient/actor/actor_test.go +++ b/pkg/rpcclient/actor/actor_test.go @@ -1,6 +1,7 @@ package actor import ( + "context" "errors" "testing" @@ -9,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/wallet" @@ -52,6 +54,12 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) { func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { return nil, nil // Just a stub, unused by actor. } +func (r *RPCClient) Context() context.Context { + panic("TODO") +} +func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { + panic("TODO") +} func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) { client := &RPCClient{ version: &result.Version{ diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go new file mode 100644 index 000000000..f42707631 --- /dev/null +++ b/pkg/rpcclient/actor/waiter.go @@ -0,0 +1,203 @@ +package actor + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" + "github.com/nspcc-dev/neo-go/pkg/util" +) + +// PollingWaiterRetryCount is a threshold for a number of subsequent failed +// attempts to get block count from the RPC server for PollingWaiter. If it fails +// to retrieve block count PollingWaiterRetryCount times in a raw then transaction +// awaiting attempt considered to be failed and an error is returned. +const PollingWaiterRetryCount = 3 + +var ( + // ErrTxNotAccepted is returned when transaction wasn't accepted to the chain + // even after ValidUntilBlock block persist. + ErrTxNotAccepted = errors.New("transaction was not accepted to chain") + // ErrContextDone is returned when Waiter context has been done in the middle + // of transaction awaiting process and no result was received yet. + ErrContextDone = errors.New("waiter context done") +) + +type ( + // RPCPollingWaiter is an interface that enables transaction awaiting functionality + // for Actor instance based on periodical BlockCount and ApplicationLog polls. + RPCPollingWaiter interface { + // Context should return the RPC client context to be able to gracefully + // shut down all running processes (if so). + Context() context.Context + GetBlockCount() (uint32, error) + GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) + } + // RPCEventWaiter is an interface that enables improved transaction awaiting functionality + // for Actor instance based on web-socket Block and ApplicationLog notifications. + RPCEventWaiter interface { + RPCPollingWaiter + + SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) + Unsubscribe(id string) error + } +) + +// Wait allows to wait until transaction will be accepted to the chain. It can be +// used as a wrapper for Send or SignAndSend and accepts transaction hash, +// ValidUntilBlock value and an error. It returns transaction execution result +// or an error if transaction wasn't accepted to the chain. +func (a *Actor) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + if err != nil { + return nil, err + } + if wsW, ok := a.client.(RPCEventWaiter); ok { + return a.waitWithWSWaiter(wsW, h, vub) + } + return a.waitWithSimpleWaiter(a.client, h, vub) +} + +// waitWithSimpleWaiter waits until transaction is accepted to the chain and +// returns its execution result or an error if it's missing from chain after +// VUB block. +func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uint32) (*state.AppExecResult, error) { + var ( + currentHeight uint32 + failedAttempt int + pollTime = time.Millisecond * time.Duration(a.GetVersion().Protocol.MillisecondsPerBlock) / 2 + ) + if pollTime == 0 { + pollTime = time.Second + } + timer := time.NewTicker(pollTime) + defer timer.Stop() + for { + select { + case <-timer.C: + blockCount, err := c.GetBlockCount() + if err != nil { + failedAttempt++ + if failedAttempt > PollingWaiterRetryCount { + return nil, fmt.Errorf("failed to retrieve block count: %w", err) + } + continue + } + failedAttempt = 0 + if blockCount-1 > currentHeight { + currentHeight = blockCount - 1 + } + t := trigger.Application + res, err := c.GetApplicationLog(h, &t) + if err == nil { + return &state.AppExecResult{ + Container: h, + Execution: res.Executions[0], + }, nil + } + if currentHeight >= vub { + return nil, ErrTxNotAccepted + } + + case <-c.Context().Done(): + return nil, fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + } + } +} + +// waitWithWSWaiter waits until transaction is accepted to the chain and returns +// its execution result or an error if it's missing from chain after VUB block. +// It uses optimized web-socket waiter if possible. +func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (res *state.AppExecResult, waitErr error) { + var wsWaitErr error + defer func() { + if wsWaitErr != nil { + res, waitErr = a.waitWithSimpleWaiter(c, h, vub) + if waitErr != nil { + waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) + } + } + }() + rcvr := make(chan rpcclient.Notification) + defer func() { + drainLoop: + // Drain rcvr to avoid other notification receivers blocking. + for { + select { + case <-rcvr: + default: + break drainLoop + } + } + close(rcvr) + }() + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, rcvr) + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) + return + } + defer func() { + err = c.Unsubscribe(blocksID) + if err != nil { + errFmt := "failed to unsubscribe from blocks (id: %s): %v" + errArgs := []interface{}{blocksID, err} + if waitErr != nil { + errFmt += "; wait error: %w" + errArgs = append(errArgs, waitErr) + } + waitErr = fmt.Errorf(errFmt, errArgs...) + } + }() + txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, rcvr) + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) + return + } + defer func() { + err = c.Unsubscribe(txsID) + if err != nil { + errFmt := "failed to unsubscribe from transactions (id: %s): %v" + errArgs := []interface{}{txsID, err} + if waitErr != nil { + errFmt += "; wait error: %w" + errArgs = append(errArgs, waitErr) + } + waitErr = fmt.Errorf(errFmt, errArgs...) + } + }() + + for { + select { + case ntf := <-rcvr: + switch ntf.Type { + case neorpc.BlockEventID: + block := ntf.Value.(*block.Block) + // Execution event follows the block event, thus wait until the block next to the VUB to be sure. + if block.Index > vub { + waitErr = ErrTxNotAccepted + return + } + case neorpc.ExecutionEventID: + aer := ntf.Value.(*state.AppExecResult) + if aer.Container.Equals(h) { + res = aer + return + } + case neorpc.MissedEventID: + // We're toast, retry with non-ws client. + wsWaitErr = errors.New("some event was missed") + return + } + case <-c.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + return + } + } +} diff --git a/pkg/rpcclient/client.go b/pkg/rpcclient/client.go index ba956b0db..e061eab6e 100644 --- a/pkg/rpcclient/client.go +++ b/pkg/rpcclient/client.go @@ -33,8 +33,11 @@ type Client struct { cli *http.Client endpoint *url.URL ctx context.Context - opts Options - requestF func(*neorpc.Request) (*neorpc.Response, error) + // ctxCancel is a cancel function aimed to send closing signal to the users of + // ctx. + ctxCancel func() + opts Options + requestF func(*neorpc.Request) (*neorpc.Response, error) // reader is an Invoker that has no signers and uses current state, // it's used to implement various getters. It'll be removed eventually, @@ -125,7 +128,9 @@ func initClient(ctx context.Context, cl *Client, endpoint string, opts Options) // if opts.Cert != "" && opts.Key != "" { // } - cl.ctx = ctx + cancelCtx, cancel := context.WithCancel(ctx) + cl.ctx = cancelCtx + cl.ctxCancel = cancel cl.cli = httpClient cl.endpoint = url cl.cache = cache{ @@ -176,6 +181,7 @@ func (c *Client) Init() error { // Close closes unused underlying networks connections. func (c *Client) Close() { + c.ctxCancel() c.cli.CloseIdleConnections() } @@ -248,3 +254,8 @@ func (c *Client) Ping() error { _ = conn.Close() return nil } + +// Context returns client instance context. +func (c *Client) Context() context.Context { + return c.ctx +} diff --git a/pkg/rpcclient/notary/actor_test.go b/pkg/rpcclient/notary/actor_test.go index c3482c75e..84aed3e17 100644 --- a/pkg/rpcclient/notary/actor_test.go +++ b/pkg/rpcclient/notary/actor_test.go @@ -1,6 +1,7 @@ package notary import ( + "context" "errors" "testing" @@ -14,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" @@ -66,6 +68,12 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) { func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { return nil, nil // Just a stub, unused by actor. } +func (r *RPCClient) Context() context.Context { + panic("TODO") +} +func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { + panic("TODO") +} func TestNewActor(t *testing.T) { rc := &RPCClient{ diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 1711a1f26..ad68c7d39 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -162,6 +162,8 @@ func (c *WSClient) Close() { // which in turn makes wsReader receive an err from ws.ReadJSON() and also // break out of the loop closing c.done channel in its shutdown sequence. close(c.shutdown) + // Call to cancel will send signal to all users of Context(). + c.Client.ctxCancel() } <-c.done } @@ -274,6 +276,7 @@ readloop: c.respChannels = nil c.respLock.Unlock() close(c.Notifications) + c.Client.ctxCancel() } func (c *WSClient) wsWriter() { @@ -569,3 +572,8 @@ func (c *WSClient) GetError() error { } return c.closeErr } + +// Context returns WSClient Cancel context that will be terminated on Client shutdown. +func (c *WSClient) Context() context.Context { + return c.Client.ctx +} diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 6ddb6885e..9c24a4846 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -1947,3 +1947,57 @@ func TestClient_Iterator_SessionConfigVariations(t *testing.T) { } }) } + +func TestClient_Wait(t *testing.T) { + chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) + defer chain.Close() + defer rpcSrv.Shutdown() + + c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) + require.NoError(t, err) + acc, err := wallet.NewAccount() + require.NoError(t, err) + act, err := actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + + b, err := chain.GetBlock(chain.GetHeaderHash(1)) + require.NoError(t, err) + require.True(t, len(b.Transactions) > 0) + + check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { + rcvr := make(chan struct{}) + go func() { + aer, err := act.Wait(h, vub, nil) + if errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, h, aer.Container) + } + rcvr <- struct{}{} + }() + waitloop: + for { + select { + case <-rcvr: + break waitloop + case <-time.NewTimer(time.Duration(chain.GetConfig().SecondsPerBlock) * time.Second).C: + t.Fatal("transaction failed to be awaited") + } + } + } + + // Wait for transaction that has been persisted and VUB block has been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) + // Wait for transaction that has been persisted and VUB block hasn't yet been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) + // Wait for transaction that hasn't been persisted and VUB block has been persisted. + check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) +}