forked from TrueCloudLab/neoneo-go
Merge pull request #2817 from nspcc-dev/always-return-hash-vub-from-sender
Some actor/waiter interaction fixes
This commit is contained in:
commit
ab0b23625b
5 changed files with 113 additions and 41 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
@ -41,7 +42,11 @@ type (
|
||||||
// Wait allows to wait until transaction will be accepted to the chain. It can be
|
// Wait allows to wait until transaction will be accepted to the chain. It can be
|
||||||
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
|
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
|
||||||
// ValidUntilBlock value and an error. It returns transaction execution result
|
// ValidUntilBlock value and an error. It returns transaction execution result
|
||||||
// or an error if transaction wasn't accepted to the chain.
|
// or an error if transaction wasn't accepted to the chain. Notice that "already
|
||||||
|
// exists" err value is not treated as an error by this routine because it
|
||||||
|
// means that the transactions given might be already accepted or soon going
|
||||||
|
// to be accepted. Such transaction can be waited for in a usual way, potentially
|
||||||
|
// with positive result, so that's what will happen.
|
||||||
Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
|
Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
|
||||||
// WaitAny waits until at least one of the specified transactions will be accepted
|
// WaitAny waits until at least one of the specified transactions will be accepted
|
||||||
// to the chain until vub (including). It returns execution result of this
|
// to the chain until vub (including). It returns execution result of this
|
||||||
|
@ -89,6 +94,12 @@ type EventWaiter struct {
|
||||||
polling Waiter
|
polling Waiter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// errIsAlreadyExists is a temporary helper until we have #2248 solved. Both C#
|
||||||
|
// and Go nodes return this string (possibly among other data).
|
||||||
|
func errIsAlreadyExists(err error) bool {
|
||||||
|
return strings.Contains(strings.ToLower(err.Error()), "already exists")
|
||||||
|
}
|
||||||
|
|
||||||
// newWaiter creates Waiter instance. It can be either websocket-based or
|
// newWaiter creates Waiter instance. It can be either websocket-based or
|
||||||
// polling-base, otherwise Waiter stub is returned.
|
// polling-base, otherwise Waiter stub is returned.
|
||||||
func newWaiter(ra RPCActor, v *result.Version) Waiter {
|
func newWaiter(ra RPCActor, v *result.Version) Waiter {
|
||||||
|
@ -139,7 +150,7 @@ func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) {
|
||||||
|
|
||||||
// Wait implements Waiter interface.
|
// Wait implements Waiter interface.
|
||||||
func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
if err != nil {
|
if err != nil && !errIsAlreadyExists(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return w.WaitAny(context.TODO(), vub, h)
|
return w.WaitAny(context.TODO(), vub, h)
|
||||||
|
@ -209,7 +220,7 @@ func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) {
|
||||||
|
|
||||||
// Wait implements Waiter interface.
|
// Wait implements Waiter interface.
|
||||||
func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) {
|
func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) {
|
||||||
if err != nil {
|
if err != nil && !errIsAlreadyExists(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return w.WaitAny(context.TODO(), vub, h)
|
return w.WaitAny(context.TODO(), vub, h)
|
||||||
|
@ -244,6 +255,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
if wsWaitErr == nil {
|
if wsWaitErr == nil {
|
||||||
|
trig := trigger.Application
|
||||||
for _, h := range hashes {
|
for _, h := range hashes {
|
||||||
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
|
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -260,22 +272,28 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
}
|
}
|
||||||
unsubErrs <- nil
|
unsubErrs <- nil
|
||||||
}()
|
}()
|
||||||
|
// There is a potential race between subscription and acceptance, so
|
||||||
|
// do a polling check once _after_ the subscription.
|
||||||
|
appLog, err := w.ws.GetApplicationLog(h, &trig)
|
||||||
|
if err == nil {
|
||||||
|
res = &state.AppExecResult{
|
||||||
|
Container: appLog.Container,
|
||||||
|
Execution: appLog.Executions[0],
|
||||||
|
}
|
||||||
|
break // We have the result, no need for other subscriptions.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if wsWaitErr == nil {
|
if wsWaitErr == nil && res == nil {
|
||||||
select {
|
select {
|
||||||
case b, ok := <-bRcvr:
|
case _, ok := <-bRcvr:
|
||||||
if !ok {
|
if !ok {
|
||||||
// We're toast, retry with non-ws client.
|
// We're toast, retry with non-ws client.
|
||||||
wsWaitErr = ErrMissedEvent
|
wsWaitErr = ErrMissedEvent
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// We can easily end up in a situation when subscription was performed too late and
|
waitErr = ErrTxNotAccepted
|
||||||
// the desired transaction and VUB-th block have already got accepted before the
|
|
||||||
// subscription happened. Thus, always retry with non-ws client, it will perform
|
|
||||||
// AER requests and make sure.
|
|
||||||
wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index)
|
|
||||||
case aer, ok := <-aerRcvr:
|
case aer, ok := <-aerRcvr:
|
||||||
if !ok {
|
if !ok {
|
||||||
// We're toast, retry with non-ws client.
|
// We're toast, retry with non-ws client.
|
||||||
|
|
|
@ -133,13 +133,12 @@ func TestWSWaiter_Wait(t *testing.T) {
|
||||||
require.ErrorIs(t, err, someErr)
|
require.ErrorIs(t, err, someErr)
|
||||||
|
|
||||||
// AER is in chain immediately.
|
// AER is in chain immediately.
|
||||||
doneCh := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
aer, err := w.Wait(h, bCount-1, nil)
|
aer, err := w.Wait(h, bCount-1, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expected, aer)
|
require.Equal(t, expected, aer)
|
||||||
doneCh <- struct{}{}
|
|
||||||
}()
|
// Auxiliary things for asynchronous tests.
|
||||||
|
doneCh := make(chan struct{})
|
||||||
check := func(t *testing.T, trigger func()) {
|
check := func(t *testing.T, trigger func()) {
|
||||||
timer := time.NewTimer(time.Second)
|
timer := time.NewTimer(time.Second)
|
||||||
var triggerFired bool
|
var triggerFired bool
|
||||||
|
@ -159,6 +158,15 @@ func TestWSWaiter_Wait(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.True(t, triggerFired)
|
require.True(t, triggerFired)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AER received after the subscription.
|
||||||
|
c.RPCClient.appLog = nil
|
||||||
|
go func() {
|
||||||
|
aer, err = w.Wait(h, bCount-1, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expected, aer)
|
||||||
|
doneCh <- struct{}{}
|
||||||
|
}()
|
||||||
check(t, func() {
|
check(t, func() {
|
||||||
c.chLock.RLock()
|
c.chLock.RLock()
|
||||||
defer c.chLock.RUnlock()
|
defer c.chLock.RUnlock()
|
||||||
|
@ -166,7 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Missing AER after VUB.
|
// Missing AER after VUB.
|
||||||
c.RPCClient.appLog = nil
|
|
||||||
go func() {
|
go func() {
|
||||||
_, err = w.Wait(h, bCount-2, nil)
|
_, err = w.Wait(h, bCount-2, nil)
|
||||||
require.ErrorIs(t, err, ErrTxNotAccepted)
|
require.ErrorIs(t, err, ErrTxNotAccepted)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
|
@ -320,9 +321,13 @@ func (a *Actor) SendRequestExactly(mainTx *transaction.Transaction, fbTx *transa
|
||||||
// the resulting application execution result or actor.ErrTxNotAccepted if both transactions
|
// the resulting application execution result or actor.ErrTxNotAccepted if both transactions
|
||||||
// failed to persist. Wait can be used if underlying Actor supports transaction awaiting,
|
// failed to persist. Wait can be used if underlying Actor supports transaction awaiting,
|
||||||
// see actor.Actor and actor.Waiter documentation for details. Wait may be used as a wrapper
|
// see actor.Actor and actor.Waiter documentation for details. Wait may be used as a wrapper
|
||||||
// for Notarize, SendRequest or SendRequestExactly.
|
// for Notarize, SendRequest or SendRequestExactly. Notice that "already exists" or "already
|
||||||
|
// on chain" answers are not treated as errors by this routine because they mean that some
|
||||||
|
// of the transactions given might be already accepted or soon going to be accepted. These
|
||||||
|
// transactions can be waited for in a usual way potentially with positive result.
|
||||||
func (a *Actor) Wait(mainHash, fbHash util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (a *Actor) Wait(mainHash, fbHash util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
if err != nil {
|
// #2248 will eventually remove this garbage from the code.
|
||||||
|
if err != nil && !(strings.Contains(strings.ToLower(err.Error()), "already exists") || strings.Contains(strings.ToLower(err.Error()), "already on chain")) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return a.WaitAny(context.TODO(), vub, mainHash, fbHash)
|
return a.WaitAny(context.TODO(), vub, mainHash, fbHash)
|
||||||
|
|
|
@ -692,17 +692,16 @@ func (c *Client) invokeSomething(method string, p []interface{}, signers []trans
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendRawTransaction broadcasts a transaction over the NEO network.
|
// SendRawTransaction broadcasts the given transaction to the Neo network.
|
||||||
// The given hex string needs to be signed with a keypair.
|
// It always returns transaction hash, when successful (no error) this is the
|
||||||
// When the result of the response object is true, the TX has successfully
|
// hash returned from server, when not it's a locally calculated rawTX hash.
|
||||||
// been broadcasted to the network.
|
|
||||||
func (c *Client) SendRawTransaction(rawTX *transaction.Transaction) (util.Uint256, error) {
|
func (c *Client) SendRawTransaction(rawTX *transaction.Transaction) (util.Uint256, error) {
|
||||||
var (
|
var (
|
||||||
params = []interface{}{rawTX.Bytes()}
|
params = []interface{}{rawTX.Bytes()}
|
||||||
resp = new(result.RelayResult)
|
resp = new(result.RelayResult)
|
||||||
)
|
)
|
||||||
if err := c.performRequest("sendrawtransaction", params, resp); err != nil {
|
if err := c.performRequest("sendrawtransaction", params, resp); err != nil {
|
||||||
return util.Uint256{}, err
|
return rawTX.Hash(), err
|
||||||
}
|
}
|
||||||
return resp.Hash, nil
|
return resp.Hash, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -16,6 +17,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
"github.com/nspcc-dev/neo-go/internal/testchain"
|
"github.com/nspcc-dev/neo-go/internal/testchain"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config"
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core"
|
"github.com/nspcc-dev/neo-go/pkg/core"
|
||||||
|
@ -2132,11 +2134,42 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) {
|
||||||
// Firstly, accept the block.
|
// Firstly, accept the block.
|
||||||
blocks := getTestBlocks(t)
|
blocks := getTestBlocks(t)
|
||||||
b1 := blocks[0]
|
b1 := blocks[0]
|
||||||
b2 := blocks[1]
|
|
||||||
tx := b1.Transactions[0]
|
tx := b1.Transactions[0]
|
||||||
require.NoError(t, chain.AddBlock(b1))
|
require.NoError(t, chain.AddBlock(b1))
|
||||||
|
|
||||||
// After that, subscribe for AERs/blocks.
|
// After that, wait and get the result immediately.
|
||||||
|
aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tx.Hash(), aer.Container)
|
||||||
|
require.Equal(t, trigger.Application, aer.Trigger)
|
||||||
|
require.Equal(t, vmstate.Halt, aer.VMState)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWSClient_WaitWithMissedEvent(t *testing.T) {
|
||||||
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
||||||
|
defer chain.Close()
|
||||||
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
||||||
|
c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, c.Init())
|
||||||
|
acc, err := wallet.NewAccount()
|
||||||
|
require.NoError(t, err)
|
||||||
|
act, err := actor.New(c, []actor.SignerAccount{
|
||||||
|
{
|
||||||
|
Signer: transaction.Signer{
|
||||||
|
Account: acc.ScriptHash(),
|
||||||
|
},
|
||||||
|
Account: acc,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
blocks := getTestBlocks(t)
|
||||||
|
b1 := blocks[0]
|
||||||
|
tx := b1.Transactions[0]
|
||||||
|
|
||||||
rcvr := make(chan *state.AppExecResult)
|
rcvr := make(chan *state.AppExecResult)
|
||||||
go func() {
|
go func() {
|
||||||
aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil)
|
aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil)
|
||||||
|
@ -2150,23 +2183,33 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) {
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
rpcSrv.subsLock.Lock()
|
rpcSrv.subsLock.Lock()
|
||||||
defer rpcSrv.subsLock.Unlock()
|
defer rpcSrv.subsLock.Unlock()
|
||||||
if len(rpcSrv.subscribers) == 1 { // single client
|
return len(rpcSrv.subscribers) == 1
|
||||||
for s := range rpcSrv.subscribers {
|
|
||||||
var count int
|
|
||||||
for _, f := range s.feeds {
|
|
||||||
if f.event != neorpc.InvalidEventID {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return count == 2 // subscription for blocks + AERs
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}, time.Second, 100*time.Millisecond)
|
}, time.Second, 100*time.Millisecond)
|
||||||
|
|
||||||
// Accept the next block to trigger event-based waiter loop exit and rollback to
|
rpcSrv.subsLock.Lock()
|
||||||
// a poll-based waiter.
|
// Suppress normal event delivery.
|
||||||
require.NoError(t, chain.AddBlock(b2))
|
for s := range rpcSrv.subscribers {
|
||||||
|
s.overflown.Store(true)
|
||||||
|
}
|
||||||
|
rpcSrv.subsLock.Unlock()
|
||||||
|
|
||||||
|
// Accept the next block, but subscriber will get no events because it's overflown.
|
||||||
|
require.NoError(t, chain.AddBlock(b1))
|
||||||
|
|
||||||
|
overEvent, err := json.Marshal(neorpc.Notification{
|
||||||
|
JSONRPC: neorpc.JSONRPCVersion,
|
||||||
|
Event: neorpc.MissedEventID,
|
||||||
|
Payload: make([]interface{}, 0),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rpcSrv.subsLock.Lock()
|
||||||
|
// Deliver overflow message -> triggers subscriber to retry with polling waiter.
|
||||||
|
for s := range rpcSrv.subscribers {
|
||||||
|
s.writer <- overflowMsg
|
||||||
|
}
|
||||||
|
rpcSrv.subsLock.Unlock()
|
||||||
|
|
||||||
// Wait for the result.
|
// Wait for the result.
|
||||||
waitloop:
|
waitloop:
|
||||||
|
|
Loading…
Reference in a new issue