rpc: implement transaction awaiting functionality

Close #2704.
This commit is contained in:
Anna Shaleva 2022-10-12 15:23:32 +03:00
parent 6d38e75149
commit 10a0716217
9 changed files with 353 additions and 3 deletions

View file

@ -232,3 +232,20 @@ func (r *Invoke) UnmarshalJSON(data []byte) error {
r.Diagnostics = aux.Diagnostics r.Diagnostics = aux.Diagnostics
return nil return nil
} }
// AppExecToInvocation converts state.AppExecResult to result.Invoke and can be used
// as a wrapper for actor.Wait. The result of AppExecToInvocation doesn't have all fields
// properly filled, it's limited by State, GasConsumed, Stack, FaultException and Notifications.
// The result of AppExecToInvocation can be passed to unwrap package helpers.
func AppExecToInvocation(aer *state.AppExecResult, err error) (*Invoke, error) {
if err != nil {
return nil, err
}
return &Invoke{
State: aer.VMState.String(),
GasConsumed: aer.GasConsumed,
Stack: aer.Stack,
FaultException: aer.FaultException,
Notifications: aer.Events,
}, nil
}

View file

@ -3,13 +3,17 @@ package result
import ( import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors"
"math/big" "math/big"
"testing" "testing"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -49,3 +53,39 @@ func TestInvoke_MarshalJSON(t *testing.T) {
require.NoError(t, json.Unmarshal(data, actual)) require.NoError(t, json.Unmarshal(data, actual))
require.Equal(t, result, actual) require.Equal(t, result, actual)
} }
func TestAppExecToInvocation(t *testing.T) {
// With error.
someErr := errors.New("some err")
_, err := AppExecToInvocation(nil, someErr)
require.ErrorIs(t, err, someErr)
// Good.
h := util.Uint256{1, 2, 3}
ex := state.Execution{
Trigger: trigger.Application,
VMState: vmstate.Fault,
GasConsumed: 123,
Stack: []stackitem.Item{stackitem.NewBigInteger(big.NewInt(123))},
Events: []state.NotificationEvent{{
ScriptHash: util.Uint160{3, 2, 1},
Name: "Notification",
Item: stackitem.NewArray([]stackitem.Item{stackitem.Null{}}),
}},
FaultException: "some fault exception",
}
inv, err := AppExecToInvocation(&state.AppExecResult{
Container: h,
Execution: ex,
}, nil)
require.NoError(t, err)
require.Equal(t, ex.VMState.String(), inv.State)
require.Equal(t, ex.GasConsumed, inv.GasConsumed)
require.Nil(t, inv.Script)
require.Equal(t, ex.Stack, inv.Stack)
require.Equal(t, ex.FaultException, inv.FaultException)
require.Equal(t, ex.Events, inv.Notifications)
require.Nil(t, inv.Transaction)
require.Nil(t, inv.Diagnostics)
require.Equal(t, uuid.UUID{}, inv.Session)
}

View file

@ -25,6 +25,7 @@ import (
// create and send transactions. // create and send transactions.
type RPCActor interface { type RPCActor interface {
invoker.RPCInvoke invoker.RPCInvoke
RPCPollingWaiter
CalculateNetworkFee(tx *transaction.Transaction) (int64, error) CalculateNetworkFee(tx *transaction.Transaction) (int64, error)
GetBlockCount() (uint32, error) GetBlockCount() (uint32, error)

View file

@ -1,6 +1,7 @@
package actor package actor
import ( import (
"context"
"errors" "errors"
"testing" "testing"
@ -9,6 +10,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
@ -52,6 +54,12 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) {
func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) {
return nil, nil // Just a stub, unused by actor. return nil, nil // Just a stub, unused by actor.
} }
func (r *RPCClient) Context() context.Context {
panic("TODO")
}
func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
panic("TODO")
}
func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) { func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) {
client := &RPCClient{ client := &RPCClient{
version: &result.Version{ version: &result.Version{

View file

@ -0,0 +1,203 @@
package actor
import (
"context"
"errors"
"fmt"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// PollingWaiterRetryCount is a threshold for a number of subsequent failed
// attempts to get block count from the RPC server for PollingWaiter. If it fails
// to retrieve block count PollingWaiterRetryCount times in a raw then transaction
// awaiting attempt considered to be failed and an error is returned.
const PollingWaiterRetryCount = 3
var (
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
// even after ValidUntilBlock block persist.
ErrTxNotAccepted = errors.New("transaction was not accepted to chain")
// ErrContextDone is returned when Waiter context has been done in the middle
// of transaction awaiting process and no result was received yet.
ErrContextDone = errors.New("waiter context done")
)
type (
// RPCPollingWaiter is an interface that enables transaction awaiting functionality
// for Actor instance based on periodical BlockCount and ApplicationLog polls.
RPCPollingWaiter interface {
// Context should return the RPC client context to be able to gracefully
// shut down all running processes (if so).
Context() context.Context
GetBlockCount() (uint32, error)
GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error)
}
// RPCEventWaiter is an interface that enables improved transaction awaiting functionality
// for Actor instance based on web-socket Block and ApplicationLog notifications.
RPCEventWaiter interface {
RPCPollingWaiter
SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- rpcclient.Notification) (string, error)
SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error)
Unsubscribe(id string) error
}
)
// Wait allows to wait until transaction will be accepted to the chain. It can be
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
// ValidUntilBlock value and an error. It returns transaction execution result
// or an error if transaction wasn't accepted to the chain.
func (a *Actor) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
if err != nil {
return nil, err
}
if wsW, ok := a.client.(RPCEventWaiter); ok {
return a.waitWithWSWaiter(wsW, h, vub)
}
return a.waitWithSimpleWaiter(a.client, h, vub)
}
// waitWithSimpleWaiter waits until transaction is accepted to the chain and
// returns its execution result or an error if it's missing from chain after
// VUB block.
func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uint32) (*state.AppExecResult, error) {
var (
currentHeight uint32
failedAttempt int
pollTime = time.Millisecond * time.Duration(a.GetVersion().Protocol.MillisecondsPerBlock) / 2
)
if pollTime == 0 {
pollTime = time.Second
}
timer := time.NewTicker(pollTime)
defer timer.Stop()
for {
select {
case <-timer.C:
blockCount, err := c.GetBlockCount()
if err != nil {
failedAttempt++
if failedAttempt > PollingWaiterRetryCount {
return nil, fmt.Errorf("failed to retrieve block count: %w", err)
}
continue
}
failedAttempt = 0
if blockCount-1 > currentHeight {
currentHeight = blockCount - 1
}
t := trigger.Application
res, err := c.GetApplicationLog(h, &t)
if err == nil {
return &state.AppExecResult{
Container: h,
Execution: res.Executions[0],
}, nil
}
if currentHeight >= vub {
return nil, ErrTxNotAccepted
}
case <-c.Context().Done():
return nil, fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err())
}
}
}
// waitWithWSWaiter waits until transaction is accepted to the chain and returns
// its execution result or an error if it's missing from chain after VUB block.
// It uses optimized web-socket waiter if possible.
func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (res *state.AppExecResult, waitErr error) {
var wsWaitErr error
defer func() {
if wsWaitErr != nil {
res, waitErr = a.waitWithSimpleWaiter(c, h, vub)
if waitErr != nil {
waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr)
}
}
}()
rcvr := make(chan rpcclient.Notification)
defer func() {
drainLoop:
// Drain rcvr to avoid other notification receivers blocking.
for {
select {
case <-rcvr:
default:
break drainLoop
}
}
close(rcvr)
}()
blocksID, err := c.SubscribeForNewBlocksWithChan(nil, rcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return
}
defer func() {
err = c.Unsubscribe(blocksID)
if err != nil {
errFmt := "failed to unsubscribe from blocks (id: %s): %v"
errArgs := []interface{}{blocksID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
}()
txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, rcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
}
defer func() {
err = c.Unsubscribe(txsID)
if err != nil {
errFmt := "failed to unsubscribe from transactions (id: %s): %v"
errArgs := []interface{}{txsID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
}()
for {
select {
case ntf := <-rcvr:
switch ntf.Type {
case neorpc.BlockEventID:
block := ntf.Value.(*block.Block)
// Execution event follows the block event, thus wait until the block next to the VUB to be sure.
if block.Index > vub {
waitErr = ErrTxNotAccepted
return
}
case neorpc.ExecutionEventID:
aer := ntf.Value.(*state.AppExecResult)
if aer.Container.Equals(h) {
res = aer
return
}
case neorpc.MissedEventID:
// We're toast, retry with non-ws client.
wsWaitErr = errors.New("some event was missed")
return
}
case <-c.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err())
return
}
}
}

View file

@ -33,8 +33,11 @@ type Client struct {
cli *http.Client cli *http.Client
endpoint *url.URL endpoint *url.URL
ctx context.Context ctx context.Context
opts Options // ctxCancel is a cancel function aimed to send closing signal to the users of
requestF func(*neorpc.Request) (*neorpc.Response, error) // ctx.
ctxCancel func()
opts Options
requestF func(*neorpc.Request) (*neorpc.Response, error)
// reader is an Invoker that has no signers and uses current state, // reader is an Invoker that has no signers and uses current state,
// it's used to implement various getters. It'll be removed eventually, // it's used to implement various getters. It'll be removed eventually,
@ -125,7 +128,9 @@ func initClient(ctx context.Context, cl *Client, endpoint string, opts Options)
// if opts.Cert != "" && opts.Key != "" { // if opts.Cert != "" && opts.Key != "" {
// } // }
cl.ctx = ctx cancelCtx, cancel := context.WithCancel(ctx)
cl.ctx = cancelCtx
cl.ctxCancel = cancel
cl.cli = httpClient cl.cli = httpClient
cl.endpoint = url cl.endpoint = url
cl.cache = cache{ cl.cache = cache{
@ -176,6 +181,7 @@ func (c *Client) Init() error {
// Close closes unused underlying networks connections. // Close closes unused underlying networks connections.
func (c *Client) Close() { func (c *Client) Close() {
c.ctxCancel()
c.cli.CloseIdleConnections() c.cli.CloseIdleConnections()
} }
@ -248,3 +254,8 @@ func (c *Client) Ping() error {
_ = conn.Close() _ = conn.Close()
return nil return nil
} }
// Context returns client instance context.
func (c *Client) Context() context.Context {
return c.ctx
}

View file

@ -1,6 +1,7 @@
package notary package notary
import ( import (
"context"
"errors" "errors"
"testing" "testing"
@ -14,6 +15,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
@ -66,6 +68,12 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) {
func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) {
return nil, nil // Just a stub, unused by actor. return nil, nil // Just a stub, unused by actor.
} }
func (r *RPCClient) Context() context.Context {
panic("TODO")
}
func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
panic("TODO")
}
func TestNewActor(t *testing.T) { func TestNewActor(t *testing.T) {
rc := &RPCClient{ rc := &RPCClient{

View file

@ -162,6 +162,8 @@ func (c *WSClient) Close() {
// which in turn makes wsReader receive an err from ws.ReadJSON() and also // which in turn makes wsReader receive an err from ws.ReadJSON() and also
// break out of the loop closing c.done channel in its shutdown sequence. // break out of the loop closing c.done channel in its shutdown sequence.
close(c.shutdown) close(c.shutdown)
// Call to cancel will send signal to all users of Context().
c.Client.ctxCancel()
} }
<-c.done <-c.done
} }
@ -274,6 +276,7 @@ readloop:
c.respChannels = nil c.respChannels = nil
c.respLock.Unlock() c.respLock.Unlock()
close(c.Notifications) close(c.Notifications)
c.Client.ctxCancel()
} }
func (c *WSClient) wsWriter() { func (c *WSClient) wsWriter() {
@ -569,3 +572,8 @@ func (c *WSClient) GetError() error {
} }
return c.closeErr return c.closeErr
} }
// Context returns WSClient Cancel context that will be terminated on Client shutdown.
func (c *WSClient) Context() context.Context {
return c.Client.ctx
}

View file

@ -1947,3 +1947,57 @@ func TestClient_Iterator_SessionConfigVariations(t *testing.T) {
} }
}) })
} }
func TestClient_Wait(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close()
defer rpcSrv.Shutdown()
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{})
require.NoError(t, err)
acc, err := wallet.NewAccount()
require.NoError(t, err)
act, err := actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
b, err := chain.GetBlock(chain.GetHeaderHash(1))
require.NoError(t, err)
require.True(t, len(b.Transactions) > 0)
check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) {
rcvr := make(chan struct{})
go func() {
aer, err := act.Wait(h, vub, nil)
if errExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, h, aer.Container)
}
rcvr <- struct{}{}
}()
waitloop:
for {
select {
case <-rcvr:
break waitloop
case <-time.NewTimer(time.Duration(chain.GetConfig().SecondsPerBlock) * time.Second).C:
t.Fatal("transaction failed to be awaited")
}
}
}
// Wait for transaction that has been persisted and VUB block has been persisted.
check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false)
// Wait for transaction that has been persisted and VUB block hasn't yet been persisted.
check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false)
// Wait for transaction that hasn't been persisted and VUB block has been persisted.
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
}