Merge pull request #951 from nspcc-dev/notification-filters-2.x

Notification filters 2.x
This commit is contained in:
Roman Khimov 2020-05-14 20:57:43 +03:00 committed by GitHub
commit f0abbfd399
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 857 additions and 297 deletions

View file

@ -1,6 +1,7 @@
package block package block
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -17,10 +18,15 @@ type Block struct {
Base Base
// Transaction list. // Transaction list.
Transactions []*transaction.Transaction `json:"tx"` Transactions []*transaction.Transaction
// True if this block is created from trimmed data. // True if this block is created from trimmed data.
Trimmed bool `json:"-"` Trimmed bool
}
// auxTxes is used for JSON i/o.
type auxTxes struct {
Transactions []*transaction.Transaction `json:"tx"`
} }
// Header returns the Header of the Block. // Header returns the Header of the Block.
@ -149,3 +155,42 @@ func (b *Block) Compare(item queue.Item) int {
return -1 return -1
} }
} }
// MarshalJSON implements json.Marshaler interface.
func (b Block) MarshalJSON() ([]byte, error) {
txes, err := json.Marshal(auxTxes{b.Transactions})
if err != nil {
return nil, err
}
baseBytes, err := json.Marshal(b.Base)
if err != nil {
return nil, err
}
// Stitch them together.
if baseBytes[len(baseBytes)-1] != '}' || txes[0] != '{' {
return nil, errors.New("can't merge internal jsons")
}
baseBytes[len(baseBytes)-1] = ','
baseBytes = append(baseBytes, txes[1:]...)
return baseBytes, nil
}
// UnmarshalJSON implements json.Unmarshaler interface.
func (b *Block) UnmarshalJSON(data []byte) error {
// As Base and txes are at the same level in json,
// do unmarshalling separately for both structs.
txes := new(auxTxes)
err := json.Unmarshal(data, txes)
if err != nil {
return err
}
base := new(Base)
err = json.Unmarshal(data, base)
if err != nil {
return err
}
b.Base = *base
b.Transactions = txes.Transactions
return nil
}

View file

@ -1,10 +1,14 @@
package block package block
import ( import (
"encoding/json"
"errors"
"fmt" "fmt"
"strconv"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
) )
@ -12,33 +16,33 @@ import (
// Base holds the base info of a block // Base holds the base info of a block
type Base struct { type Base struct {
// Version of the block. // Version of the block.
Version uint32 `json:"version"` Version uint32
// hash of the previous block. // hash of the previous block.
PrevHash util.Uint256 `json:"previousblockhash"` PrevHash util.Uint256
// Root hash of a transaction list. // Root hash of a transaction list.
MerkleRoot util.Uint256 `json:"merkleroot"` MerkleRoot util.Uint256
// The time stamp of each block must be later than previous block's time stamp. // The time stamp of each block must be later than previous block's time stamp.
// Generally the difference of two block's time stamp is about 15 seconds and imprecision is allowed. // Generally the difference of two block's time stamp is about 15 seconds and imprecision is allowed.
// The height of the block must be exactly equal to the height of the previous block plus 1. // The height of the block must be exactly equal to the height of the previous block plus 1.
Timestamp uint32 `json:"time"` Timestamp uint32
// index/height of the block // index/height of the block
Index uint32 `json:"height"` Index uint32
// Random number also called nonce // Random number also called nonce
ConsensusData uint64 `json:"nonce"` ConsensusData uint64
// Contract address of the next miner // Contract address of the next miner
NextConsensus util.Uint160 `json:"next_consensus"` NextConsensus util.Uint160
// Padding that is fixed to 1 // Padding that is fixed to 1
_ uint8 _ uint8
// Script used to validate the block // Script used to validate the block
Script transaction.Witness `json:"script"` Script transaction.Witness
// Hash of this block, created when binary encoded (double SHA256). // Hash of this block, created when binary encoded (double SHA256).
hash util.Uint256 hash util.Uint256
@ -47,6 +51,21 @@ type Base struct {
verificationHash util.Uint256 verificationHash util.Uint256
} }
// baseAux is used to marshal/unmarshal to/from JSON, it's almost the same
// as original Base, but with Nonce and NextConsensus fields differing and
// Hash added.
type baseAux struct {
Hash util.Uint256 `json:"hash"`
Version uint32 `json:"version"`
PrevHash util.Uint256 `json:"previousblockhash"`
MerkleRoot util.Uint256 `json:"merkleroot"`
Timestamp uint32 `json:"time"`
Index uint32 `json:"index"`
Nonce string `json:"nonce"`
NextConsensus string `json:"nextconsensus"`
Script transaction.Witness `json:"script"`
}
// Verify verifies the integrity of the Base. // Verify verifies the integrity of the Base.
func (b *Base) Verify() bool { func (b *Base) Verify() bool {
// TODO: Need a persisted blockchain for this. // TODO: Need a persisted blockchain for this.
@ -140,3 +159,56 @@ func (b *Base) decodeHashableFields(br *io.BinReader) {
b.createHash() b.createHash()
} }
} }
// MarshalJSON implements json.Marshaler interface.
func (b Base) MarshalJSON() ([]byte, error) {
nonce := strconv.FormatUint(b.ConsensusData, 16)
for len(nonce) < 16 {
nonce = "0" + nonce
}
aux := baseAux{
Hash: b.Hash(),
Version: b.Version,
PrevHash: b.PrevHash,
MerkleRoot: b.MerkleRoot,
Timestamp: b.Timestamp,
Index: b.Index,
Nonce: nonce,
NextConsensus: address.Uint160ToString(b.NextConsensus),
Script: b.Script,
}
return json.Marshal(aux)
}
// UnmarshalJSON implements json.Unmarshaler interface.
func (b *Base) UnmarshalJSON(data []byte) error {
var aux = new(baseAux)
var nonce uint64
var nextC util.Uint160
err := json.Unmarshal(data, aux)
if err != nil {
return err
}
nonce, err = strconv.ParseUint(aux.Nonce, 16, 64)
if err != nil {
return err
}
nextC, err = address.StringToUint160(aux.NextConsensus)
if err != nil {
return err
}
b.Version = aux.Version
b.PrevHash = aux.PrevHash
b.MerkleRoot = aux.MerkleRoot
b.Timestamp = aux.Timestamp
b.Index = aux.Index
b.ConsensusData = nonce
b.NextConsensus = nextC
b.Script = aux.Script
if !aux.Hash.Equals(b.Hash()) {
return errors.New("json 'hash' doesn't match block hash")
}
return nil
}

View file

@ -188,6 +188,8 @@ func TestBinBlockDecodeEncode(t *testing.T) {
data, err := testserdes.EncodeBinary(&b) data, err := testserdes.EncodeBinary(&b)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, rawtx, hex.EncodeToString(data)) assert.Equal(t, rawtx, hex.EncodeToString(data))
testserdes.MarshalUnmarshalJSON(t, &b, new(Block))
} }
func TestBlockSizeCalculation(t *testing.T) { func TestBlockSizeCalculation(t *testing.T) {

View file

@ -3,6 +3,7 @@ package transaction
import ( import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
@ -10,8 +11,14 @@ import (
// Attribute represents a Transaction attribute. // Attribute represents a Transaction attribute.
type Attribute struct { type Attribute struct {
Usage AttrUsage `json:"usage"` Usage AttrUsage
Data []byte `json:"data"` Data []byte
}
// attrJSON is used for JSON I/O of Attribute.
type attrJSON struct {
Usage string `json:"usage"`
Data string `json:"data"`
} }
// DecodeBinary implements Serializable interface. // DecodeBinary implements Serializable interface.
@ -72,8 +79,106 @@ func (attr *Attribute) EncodeBinary(bw *io.BinWriter) {
// MarshalJSON implements the json Marshaller interface. // MarshalJSON implements the json Marshaller interface.
func (attr *Attribute) MarshalJSON() ([]byte, error) { func (attr *Attribute) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{ return json.Marshal(attrJSON{
"usage": attr.Usage.String(), Usage: attr.Usage.String(),
"data": hex.EncodeToString(attr.Data), Data: hex.EncodeToString(attr.Data),
}) })
} }
// UnmarshalJSON implements the json.Unmarshaller interface.
func (attr *Attribute) UnmarshalJSON(data []byte) error {
aj := new(attrJSON)
err := json.Unmarshal(data, aj)
if err != nil {
return err
}
binData, err := hex.DecodeString(aj.Data)
if err != nil {
return err
}
switch aj.Usage {
case "ContractHash":
attr.Usage = ContractHash
case "ECDH02":
attr.Usage = ECDH02
case "ECDH03":
attr.Usage = ECDH03
case "Script":
attr.Usage = Script
case "Vote":
attr.Usage = Vote
case "CertURL":
attr.Usage = CertURL
case "DescriptionURL":
attr.Usage = DescriptionURL
case "Description":
attr.Usage = Description
case "Hash1":
attr.Usage = Hash1
case "Hash2":
attr.Usage = Hash2
case "Hash3":
attr.Usage = Hash3
case "Hash4":
attr.Usage = Hash4
case "Hash5":
attr.Usage = Hash5
case "Hash6":
attr.Usage = Hash6
case "Hash7":
attr.Usage = Hash7
case "Hash8":
attr.Usage = Hash8
case "Hash9":
attr.Usage = Hash9
case "Hash10":
attr.Usage = Hash10
case "Hash11":
attr.Usage = Hash11
case "Hash12":
attr.Usage = Hash12
case "Hash13":
attr.Usage = Hash13
case "Hash14":
attr.Usage = Hash14
case "Hash15":
attr.Usage = Hash15
case "Remark":
attr.Usage = Remark
case "Remark1":
attr.Usage = Remark1
case "Remark2":
attr.Usage = Remark2
case "Remark3":
attr.Usage = Remark3
case "Remark4":
attr.Usage = Remark4
case "Remark5":
attr.Usage = Remark5
case "Remark6":
attr.Usage = Remark6
case "Remark7":
attr.Usage = Remark7
case "Remark8":
attr.Usage = Remark8
case "Remark9":
attr.Usage = Remark9
case "Remark10":
attr.Usage = Remark10
case "Remark11":
attr.Usage = Remark11
case "Remark12":
attr.Usage = Remark12
case "Remark13":
attr.Usage = Remark13
case "Remark14":
attr.Usage = Remark14
case "Remark15":
attr.Usage = Remark15
default:
return errors.New("wrong Usage")
}
attr.Data = binData
return nil
}

View file

@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
@ -14,6 +15,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -29,6 +31,83 @@ type rpcClientTestCase struct {
check func(t *testing.T, c *Client, result interface{}) check func(t *testing.T, c *Client, result interface{})
} }
// getResultBlock1 returns data for block number 1 which is used by several tests.
func getResultBlock1() *result.Block {
nextBlockHash, err := util.Uint256DecodeStringLE("cc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f")
if err != nil {
panic(err)
}
prevBlockHash, err := util.Uint256DecodeStringLE("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
if err != nil {
panic(err)
}
merkleRoot, err := util.Uint256DecodeStringLE("cb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2")
if err != nil {
panic(err)
}
invScript, err := hex.DecodeString("40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df")
if err != nil {
panic(err)
}
verifScript, err := hex.DecodeString("532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae")
if err != nil {
panic(err)
}
var nonce uint64
i, err := fmt.Sscanf("51b484a2fe49ed4d", "%016x", &nonce)
if i != 1 {
panic("can't decode nonce")
}
if err != nil {
panic(err)
}
nextCon, err := address.StringToUint160("AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU")
if err != nil {
panic(err)
}
tx := &transaction.Transaction{
Type: transaction.MinerType,
Version: 0,
Data: &transaction.MinerTX{Nonce: 4266257741},
Attributes: []transaction.Attribute{},
Inputs: []transaction.Input{},
Outputs: []transaction.Output{},
Scripts: []transaction.Witness{},
Trimmed: false,
}
base := &block.Base{
Version: 0,
PrevHash: prevBlockHash,
MerkleRoot: merkleRoot,
Timestamp: 1541215200,
Index: 1,
ConsensusData: nonce,
NextConsensus: nextCon,
Script: transaction.Witness{
InvocationScript: invScript,
VerificationScript: verifScript,
},
}
// Update hashes for correct result comparison.
_ = tx.Hash()
_ = base.Hash()
return &result.Block{
Base: base,
BlockMetadataAndTx: result.BlockMetadataAndTx{
Size: 452,
Confirmations: 10534,
NextBlockHash: &nextBlockHash,
Tx: []result.Tx{{
Transaction: tx,
Fees: result.Fees{
SysFee: 0,
NetFee: 0,
},
}},
},
}
}
// rpcClientTestCases contains `serverResponse` json data fetched from examples // rpcClientTestCases contains `serverResponse` json data fetched from examples
// published in official C# JSON-RPC API v2.10.3 reference // published in official C# JSON-RPC API v2.10.3 reference
// (see https://docs.neo.org/docs/en-us/reference/rpc/latest-version/api.html) // (see https://docs.neo.org/docs/en-us/reference/rpc/latest-version/api.html)
@ -158,66 +237,7 @@ var rpcClientTestCases = map[string][]rpcClientTestCase{
}, },
serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"hash":"0xe93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c","size":452,"version":0,"nextblockhash":"0xcc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f","previousblockhash":"0x996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099","merkleroot":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","time":1541215200,"index":1,"nonce":"51b484a2fe49ed4d","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","confirmations":10534,"script":{"invocation":"40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[],"sys_fee":"0","net_fee":"0","nonce":4266257741}]}}`, serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"hash":"0xe93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c","size":452,"version":0,"nextblockhash":"0xcc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f","previousblockhash":"0x996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099","merkleroot":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","time":1541215200,"index":1,"nonce":"51b484a2fe49ed4d","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","confirmations":10534,"script":{"invocation":"40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[],"sys_fee":"0","net_fee":"0","nonce":4266257741}]}}`,
result: func(c *Client) interface{} { result: func(c *Client) interface{} {
hash, err := util.Uint256DecodeStringLE("e93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c") return getResultBlock1()
if err != nil {
panic(err)
}
nextBlockHash, err := util.Uint256DecodeStringLE("cc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f")
if err != nil {
panic(err)
}
prevBlockHash, err := util.Uint256DecodeStringLE("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
if err != nil {
panic(err)
}
merkleRoot, err := util.Uint256DecodeStringLE("cb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2")
if err != nil {
panic(err)
}
invScript, err := hex.DecodeString("40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df")
if err != nil {
panic(err)
}
verifScript, err := hex.DecodeString("532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae")
if err != nil {
panic(err)
}
tx := &transaction.Transaction{
Type: transaction.MinerType,
Version: 0,
Data: &transaction.MinerTX{Nonce: 4266257741},
Attributes: []transaction.Attribute{},
Inputs: []transaction.Input{},
Outputs: []transaction.Output{},
Scripts: []transaction.Witness{},
Trimmed: false,
}
// Update hashes for correct result comparison.
_ = tx.Hash()
return &result.Block{
Hash: hash,
Size: 452,
Version: 0,
NextBlockHash: &nextBlockHash,
PreviousBlockHash: prevBlockHash,
MerkleRoot: merkleRoot,
Time: 1541215200,
Index: 1,
Nonce: "51b484a2fe49ed4d",
NextConsensus: "AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU",
Confirmations: 10534,
Script: transaction.Witness{
InvocationScript: invScript,
VerificationScript: verifScript,
},
Tx: []result.Tx{{
Transaction: tx,
Fees: result.Fees{
SysFee: 0,
NetFee: 0,
},
}},
}
}, },
}, },
{ {
@ -253,66 +273,7 @@ var rpcClientTestCases = map[string][]rpcClientTestCase{
}, },
serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"hash":"0xe93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c","size":452,"version":0,"nextblockhash":"0xcc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f","previousblockhash":"0x996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099","merkleroot":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","time":1541215200,"index":1,"nonce":"51b484a2fe49ed4d","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","confirmations":10534,"script":{"invocation":"40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[],"sys_fee":"0","net_fee":"0","nonce":4266257741}]}}`, serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"hash":"0xe93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c","size":452,"version":0,"nextblockhash":"0xcc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f","previousblockhash":"0x996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099","merkleroot":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","time":1541215200,"index":1,"nonce":"51b484a2fe49ed4d","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","confirmations":10534,"script":{"invocation":"40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[],"sys_fee":"0","net_fee":"0","nonce":4266257741}]}}`,
result: func(c *Client) interface{} { result: func(c *Client) interface{} {
hash, err := util.Uint256DecodeStringLE("e93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c") return getResultBlock1()
if err != nil {
panic(err)
}
nextBlockHash, err := util.Uint256DecodeStringLE("cc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f")
if err != nil {
panic(err)
}
prevBlockHash, err := util.Uint256DecodeStringLE("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
if err != nil {
panic(err)
}
merkleRoot, err := util.Uint256DecodeStringLE("cb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2")
if err != nil {
panic(err)
}
invScript, err := hex.DecodeString("40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df")
if err != nil {
panic(err)
}
verifScript, err := hex.DecodeString("532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae")
if err != nil {
panic(err)
}
tx := &transaction.Transaction{
Type: transaction.MinerType,
Version: 0,
Data: &transaction.MinerTX{Nonce: 4266257741},
Attributes: []transaction.Attribute{},
Inputs: []transaction.Input{},
Outputs: []transaction.Output{},
Scripts: []transaction.Witness{},
Trimmed: false,
}
// Update hashes for correct result comparison.
_ = tx.Hash()
return &result.Block{
Hash: hash,
Size: 452,
Version: 0,
NextBlockHash: &nextBlockHash,
PreviousBlockHash: prevBlockHash,
MerkleRoot: merkleRoot,
Time: 1541215200,
Index: 1,
Nonce: "51b484a2fe49ed4d",
NextConsensus: "AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU",
Confirmations: 10534,
Script: transaction.Witness{
InvocationScript: invScript,
VerificationScript: verifScript,
},
Tx: []result.Tx{{
Transaction: tx,
Fees: result.Fees{
SysFee: 0,
NetFee: 0,
},
}},
}
}, },
}, },
}, },
@ -376,46 +337,20 @@ var rpcClientTestCases = map[string][]rpcClientTestCase{
}, },
serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"hash":"0xe93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c","size":442,"version":0,"previousblockhash":"0x996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099","merkleroot":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","time":1541215200,"index":1,"nonce":"51b484a2fe49ed4d","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","script":{"invocation":"40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"confirmations":20061,"nextblockhash":"0xcc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f"}}`, serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"hash":"0xe93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c","size":442,"version":0,"previousblockhash":"0x996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099","merkleroot":"0xcb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2","time":1541215200,"index":1,"nonce":"51b484a2fe49ed4d","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","script":{"invocation":"40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"confirmations":20061,"nextblockhash":"0xcc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f"}}`,
result: func(c *Client) interface{} { result: func(c *Client) interface{} {
hash, err := util.Uint256DecodeStringLE("e93d17a52967f9e69314385482bf86f85260e811b46bf4d4b261a7f4135a623c") b := getResultBlock1()
if err != nil {
panic(err)
}
nextBlockHash, err := util.Uint256DecodeStringLE("cc37d5bc460e72c9423015cb8d579c13e7b03b93bfaa1a23cf4fa777988e035f")
if err != nil {
panic(err)
}
prevBlockHash, err := util.Uint256DecodeStringLE("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
if err != nil {
panic(err)
}
merkleRoot, err := util.Uint256DecodeStringLE("cb6ddb5f99d6af4c94a6c396d5294472f2eebc91a2c933e0f527422296fa9fb2")
if err != nil {
panic(err)
}
invScript, err := hex.DecodeString("40356a91d94e398170e47447d6a0f60aa5470e209782a5452403115a49166db3e1c4a3898122db19f779c30f8ccd0b7d401acdf71eda340655e4ae5237a64961bf4034dd47955e5a71627dafc39dd92999140e9eaeec6b11dbb2b313efa3f1093ed915b4455e199c69ec53778f94ffc236b92f8b97fff97a1f6bbb3770c0c0b3844a40fbe743bd5c90b2f5255e0b073281d7aeb2fb516572f36bec8446bcc37ac755cbf10d08b16c95644db1b2dddc2df5daa377880b20198fc7b967ac6e76474b22df")
if err != nil {
panic(err)
}
verifScript, err := hex.DecodeString("532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae")
if err != nil {
panic(err)
}
return &result.Header{ return &result.Header{
Hash: hash, Hash: b.Hash(),
Size: 442, Size: 442,
Version: 0, Version: b.Version,
NextBlockHash: &nextBlockHash, NextBlockHash: b.NextBlockHash,
PrevBlockHash: prevBlockHash, PrevBlockHash: b.PrevHash,
MerkleRoot: merkleRoot, MerkleRoot: b.MerkleRoot,
Timestamp: 1541215200, Timestamp: b.Timestamp,
Index: 1, Index: b.Index,
Nonce: "51b484a2fe49ed4d", Nonce: "51b484a2fe49ed4d",
NextConsensus: "AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU", NextConsensus: address.Uint160ToString(b.NextConsensus),
Confirmations: 20061, Confirmations: 20061,
Script: transaction.Witness{ Script: b.Script,
InvocationScript: invScript,
VerificationScript: verifScript,
},
} }
}, },
}, },

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result"
"github.com/nspcc-dev/neo-go/pkg/util"
) )
// WSClient is a websocket-enabled RPC client that can be used with appropriate // WSClient is a websocket-enabled RPC client that can be used with appropriate
@ -125,7 +126,7 @@ readloop:
} }
var slice []json.RawMessage var slice []json.RawMessage
err = json.Unmarshal(rr.RawParams, &slice) err = json.Unmarshal(rr.RawParams, &slice)
if err != nil || len(slice) != 1 { if err != nil || (event != response.MissedEventID && len(slice) != 1) {
// Bad event received. // Bad event received.
break break
} }
@ -139,15 +140,19 @@ readloop:
val = new(result.NotificationEvent) val = new(result.NotificationEvent)
case response.ExecutionEventID: case response.ExecutionEventID:
val = new(result.ApplicationLog) val = new(result.ApplicationLog)
case response.MissedEventID:
// No value.
default: default:
// Bad event received. // Bad event received.
break readloop break readloop
} }
if event != response.MissedEventID {
err = json.Unmarshal(slice[0], val) err = json.Unmarshal(slice[0], val)
if err != nil || len(slice) != 1 { if err != nil || len(slice) != 1 {
// Bad event received. // Bad event received.
break break
} }
}
c.Notifications <- Notification{event, val} c.Notifications <- Notification{event, val}
} else if rr.RawID != nil && (rr.Error != nil || rr.Result != nil) { } else if rr.RawID != nil && (rr.Error != nil || rr.Result != nil) {
resp := new(response.Raw) resp := new(response.Raw)
@ -242,23 +247,40 @@ func (c *WSClient) SubscribeForNewBlocks() (string, error) {
} }
// SubscribeForNewTransactions adds subscription for new transaction events to // SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of client. // this instance of client. It can be filtered by transaction type, nil value
func (c *WSClient) SubscribeForNewTransactions() (string, error) { // is treated as missing filter.
func (c *WSClient) SubscribeForNewTransactions(txType *transaction.TXType) (string, error) {
params := request.NewRawParams("transaction_added") params := request.NewRawParams("transaction_added")
if txType != nil {
params.Values = append(params.Values, request.TxFilter{Type: *txType})
}
return c.performSubscription(params) return c.performSubscription(params)
} }
// SubscribeForExecutionNotifications adds subscription for notifications // SubscribeForExecutionNotifications adds subscription for notifications
// generated during transaction execution to this instance of client. // generated during transaction execution to this instance of client. It can be
func (c *WSClient) SubscribeForExecutionNotifications() (string, error) { // filtered by contract's hash (that emits notifications), nil value puts no such
// restrictions.
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160) (string, error) {
params := request.NewRawParams("notification_from_execution") params := request.NewRawParams("notification_from_execution")
if contract != nil {
params.Values = append(params.Values, request.NotificationFilter{Contract: *contract})
}
return c.performSubscription(params) return c.performSubscription(params)
} }
// SubscribeForTransactionExecutions adds subscription for application execution // SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of client. // results generated during transaction execution to this instance of client. Can
func (c *WSClient) SubscribeForTransactionExecutions() (string, error) { // be filtered by state (HALT/FAULT) to check for successful or failing
// transactions, nil value means no filtering.
func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) {
params := request.NewRawParams("transaction_executed") params := request.NewRawParams("transaction_executed")
if state != nil {
if *state != "HALT" && *state != "FAULT" {
return "", errors.New("bad state parameter")
}
params.Values = append(params.Values, request.ExecutionFilter{State: *state})
}
return c.performSubscription(params) return c.performSubscription(params)
} }

View file

@ -8,6 +8,9 @@ import (
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -22,9 +25,15 @@ func TestWSClientClose(t *testing.T) {
func TestWSClientSubscription(t *testing.T) { func TestWSClientSubscription(t *testing.T) {
var cases = map[string]func(*WSClient) (string, error){ var cases = map[string]func(*WSClient) (string, error){
"blocks": (*WSClient).SubscribeForNewBlocks, "blocks": (*WSClient).SubscribeForNewBlocks,
"transactions": (*WSClient).SubscribeForNewTransactions, "transactions": func(wsc *WSClient) (string, error) {
"notifications": (*WSClient).SubscribeForExecutionNotifications, return wsc.SubscribeForNewTransactions(nil)
"executions": (*WSClient).SubscribeForTransactionExecutions, },
"notifications": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotifications(nil)
},
"executions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutions(nil)
},
} }
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
for name, f := range cases { for name, f := range cases {
@ -107,7 +116,8 @@ func TestWSClientEvents(t *testing.T) {
`{"jsonrpc":"2.0","method":"transaction_executed","params":[{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","executions":[{"trigger":"Application","contract":"0x0000000000000000000000000000000000000000","vmstate":"HALT","gas_consumed":"1.048","stack":[{"type":"Integer","value":"1"}],"notifications":[{"contract":"0xc2789e5ab9bab828743833965b1df0d5fbcc206f","state":{"type":"Array","value":[{"type":"ByteArray","value":"636f6e74726163742063616c6c"},{"type":"ByteArray","value":"507574"},{"type":"Array","value":[{"type":"ByteArray","value":"746573746b6579"},{"type":"ByteArray","value":"7465737476616c7565"}]}]}}]}]}]}`, `{"jsonrpc":"2.0","method":"transaction_executed","params":[{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","executions":[{"trigger":"Application","contract":"0x0000000000000000000000000000000000000000","vmstate":"HALT","gas_consumed":"1.048","stack":[{"type":"Integer","value":"1"}],"notifications":[{"contract":"0xc2789e5ab9bab828743833965b1df0d5fbcc206f","state":{"type":"Array","value":[{"type":"ByteArray","value":"636f6e74726163742063616c6c"},{"type":"ByteArray","value":"507574"},{"type":"Array","value":[{"type":"ByteArray","value":"746573746b6579"},{"type":"ByteArray","value":"7465737476616c7565"}]}]}}]}]}]}`,
`{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"contract":"0xc2789e5ab9bab828743833965b1df0d5fbcc206f","state":{"type":"Array","value":[{"type":"ByteArray","value":"636f6e74726163742063616c6c"},{"type":"ByteArray","value":"507574"},{"type":"Array","value":[{"type":"ByteArray","value":"746573746b6579"},{"type":"ByteArray","value":"7465737476616c7565"}]}]}}]}`, `{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"contract":"0xc2789e5ab9bab828743833965b1df0d5fbcc206f","state":{"type":"Array","value":[{"type":"ByteArray","value":"636f6e74726163742063616c6c"},{"type":"ByteArray","value":"507574"},{"type":"Array","value":[{"type":"ByteArray","value":"746573746b6579"},{"type":"ByteArray","value":"7465737476616c7565"}]}]}}]}`,
`{"jsonrpc":"2.0","method":"transaction_added","params":[{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","size":60,"type":"InvocationTransaction","version":1,"attributes":[],"vin":[],"vout":[],"scripts":[],"script":"097465737476616c756507746573746b657952c103507574676f20ccfbd5f01d5b9633387428b8bab95a9e78c2"}]}`, `{"jsonrpc":"2.0","method":"transaction_added","params":[{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","size":60,"type":"InvocationTransaction","version":1,"attributes":[],"vin":[],"vout":[],"scripts":[],"script":"097465737476616c756507746573746b657952c103507574676f20ccfbd5f01d5b9633387428b8bab95a9e78c2"}]}`,
`{"jsonrpc":"2.0","method":"block_added","params":[{"version":0,"previousblockhash":"0x33f3e0e24542b2ec3b6420e6881c31f6460a39a4e733d88f7557cbcc3b5ed560","merkleroot":"0x9d922c5cfd4c8cd1da7a6b2265061998dc438bd0dea7145192e2858155e6c57a","time":1586154525,"height":205,"nonce":1111,"next_consensus":"0xa21e4f7178607089e4fe9fab1300d1f5a3d348be","script":{"invocation":"4047a444a51218ac856f1cbc629f251c7c88187910534d6ba87847c86a9a73ed4951d203fd0a87f3e65657a7259269473896841f65c0a0c8efc79d270d917f4ff640435ee2f073c94a02f0276dfe4465037475e44e1c34c0decb87ec9c2f43edf688059fc4366a41c673d72ba772b4782c39e79f01cb981247353216d52d2df1651140527eb0dfd80a800fdd7ac8fbe68fc9366db2d71655d8ba235525a97a69a7181b1e069b82091be711c25e504a17c3c55eee6e76e6af13cb488fbe35d5c5d025c34041f39a02ebe9bb08be0e4aaa890f447dc9453209bbfb4705d8f2d869c2b55ee2d41dbec2ee476a059d77fb7c26400284328d05aece5f3168b48f1db1c6f7be0b","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xf9adfde059810f37b3d0686d67f6b29034e0c669537df7e59b40c14a0508b9ed","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[]},{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","size":60,"type":"InvocationTransaction","version":1,"attributes":[],"vin":[],"vout":[],"scripts":[],"script":"097465737476616c756507746573746b657952c103507574676f20ccfbd5f01d5b9633387428b8bab95a9e78c2"}]}]}`, `{"jsonrpc":"2.0","method":"block_added","params":[{"hash":"0x48fba8aebf88278818a3dc0caecb230873d1d4ce1ea8bf473634317f94a609e5","version":0,"previousblockhash":"0x33f3e0e24542b2ec3b6420e6881c31f6460a39a4e733d88f7557cbcc3b5ed560","merkleroot":"0x9d922c5cfd4c8cd1da7a6b2265061998dc438bd0dea7145192e2858155e6c57a","time":1586154525,"index":205,"nonce":"0000000000000457","nextconsensus":"AZ81H31DMWzbSnFDLFkzh9vHwaDLayV7fU","script":{"invocation":"4047a444a51218ac856f1cbc629f251c7c88187910534d6ba87847c86a9a73ed4951d203fd0a87f3e65657a7259269473896841f65c0a0c8efc79d270d917f4ff640435ee2f073c94a02f0276dfe4465037475e44e1c34c0decb87ec9c2f43edf688059fc4366a41c673d72ba772b4782c39e79f01cb981247353216d52d2df1651140527eb0dfd80a800fdd7ac8fbe68fc9366db2d71655d8ba235525a97a69a7181b1e069b82091be711c25e504a17c3c55eee6e76e6af13cb488fbe35d5c5d025c34041f39a02ebe9bb08be0e4aaa890f447dc9453209bbfb4705d8f2d869c2b55ee2d41dbec2ee476a059d77fb7c26400284328d05aece5f3168b48f1db1c6f7be0b","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xf9adfde059810f37b3d0686d67f6b29034e0c669537df7e59b40c14a0508b9ed","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[]},{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","size":60,"type":"InvocationTransaction","version":1,"attributes":[],"vin":[],"vout":[],"scripts":[],"script":"097465737476616c756507746573746b657952c103507574676f20ccfbd5f01d5b9633387428b8bab95a9e78c2"}]}]}`,
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`,
} }
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" { if req.URL.Path == "/ws" && req.Method == "GET" {
@ -144,3 +154,96 @@ func TestWSClientEvents(t *testing.T) {
// Connection closed by server. // Connection closed by server.
require.Equal(t, false, ok) require.Equal(t, false, ok)
} }
func TestWSExecutionVMStateCheck(t *testing.T) {
// Will answer successfully if request slips through.
srv := initTestServer(t, `{"jsonrpc": "2.0", "id": 1, "result": "55aaff00"}`)
defer srv.Close()
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
filter := "NONE"
_, err = wsc.SubscribeForTransactionExecutions(&filter)
require.Error(t, err)
wsc.Close()
}
func TestWSFilteredSubscriptions(t *testing.T) {
var cases = []struct {
name string
clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *request.Params)
}{
{"transactions",
func(t *testing.T, wsc *WSClient) {
tt := transaction.InvocationType
_, err := wsc.SubscribeForNewTransactions(&tt)
require.NoError(t, err)
},
func(t *testing.T, p *request.Params) {
param, ok := p.Value(1)
require.Equal(t, true, ok)
require.Equal(t, request.TxFilterT, param.Type)
filt, ok := param.Value.(request.TxFilter)
require.Equal(t, true, ok)
require.Equal(t, transaction.InvocationType, filt.Type)
},
},
{"notifications",
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForExecutionNotifications(&contract)
require.NoError(t, err)
},
func(t *testing.T, p *request.Params) {
param, ok := p.Value(1)
require.Equal(t, true, ok)
require.Equal(t, request.NotificationFilterT, param.Type)
filt, ok := param.Value.(request.NotificationFilter)
require.Equal(t, true, ok)
require.Equal(t, util.Uint160{1, 2, 3, 4, 5}, filt.Contract)
},
},
{"executions",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
_, err := wsc.SubscribeForTransactionExecutions(&state)
require.NoError(t, err)
},
func(t *testing.T, p *request.Params) {
param, ok := p.Value(1)
require.Equal(t, true, ok)
require.Equal(t, request.ExecutionFilterT, param.Type)
filt, ok := param.Value.(request.ExecutionFilter)
require.Equal(t, true, ok)
require.Equal(t, "FAULT", filt.State)
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err)
ws.SetReadDeadline(time.Now().Add(2 * time.Second))
req := request.In{}
err = ws.ReadJSON(&req)
require.NoError(t, err)
params, err := req.Params()
require.NoError(t, err)
c.serverCode(t, params)
ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
err = ws.WriteMessage(1, []byte(`{"jsonrpc": "2.0", "id": 1, "result": "0"}`))
require.NoError(t, err)
ws.Close()
}
}))
defer srv.Close()
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
c.clientCode(t, wsc)
wsc.Close()
})
}
}

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -29,6 +30,23 @@ type (
Type smartcontract.ParamType `json:"type"` Type smartcontract.ParamType `json:"type"`
Value Param `json:"value"` Value Param `json:"value"`
} }
// TxFilter is a wrapper structure for transaction event filter. The only
// allowed filter is a transaction type for now.
TxFilter struct {
Type transaction.TXType `json:"type"`
}
// NotificationFilter is a wrapper structure representing filter used for
// notifications generated during transaction execution. Notifications can
// only be filtered by contract hash.
NotificationFilter struct {
Contract util.Uint160 `json:"contract"`
}
// ExecutionFilter is a wrapper structure used for transaction execution
// events. It allows to choose failing or successful transactions based
// on their VM state.
ExecutionFilter struct {
State string `json:"state"`
}
) )
// These are parameter types accepted by RPC server. // These are parameter types accepted by RPC server.
@ -38,6 +56,9 @@ const (
NumberT NumberT
ArrayT ArrayT
FuncParamT FuncParamT
TxFilterT
NotificationFilterT
ExecutionFilterT
) )
func (p Param) String() string { func (p Param) String() string {
@ -130,38 +151,47 @@ func (p Param) GetBytesHex() ([]byte, error) {
// UnmarshalJSON implements json.Unmarshaler interface. // UnmarshalJSON implements json.Unmarshaler interface.
func (p *Param) UnmarshalJSON(data []byte) error { func (p *Param) UnmarshalJSON(data []byte) error {
var s string var s string
if err := json.Unmarshal(data, &s); err == nil {
p.Type = StringT
p.Value = s
return nil
}
var num float64 var num float64
if err := json.Unmarshal(data, &num); err == nil { // To unmarshal correctly we need to pass pointers into the decoder.
p.Type = NumberT var attempts = [...]Param{
p.Value = int(num) {NumberT, &num},
{StringT, &s},
return nil {FuncParamT, &FuncParam{}},
{TxFilterT, &TxFilter{}},
{NotificationFilterT, &NotificationFilter{}},
{ExecutionFilterT, &ExecutionFilter{}},
{ArrayT, &[]Param{}},
} }
for _, cur := range attempts {
r := bytes.NewReader(data) r := bytes.NewReader(data)
jd := json.NewDecoder(r) jd := json.NewDecoder(r)
jd.DisallowUnknownFields() jd.DisallowUnknownFields()
var fp FuncParam if err := jd.Decode(cur.Value); err == nil {
if err := jd.Decode(&fp); err == nil { p.Type = cur.Type
p.Type = FuncParamT // But we need to store actual values, not pointers.
p.Value = fp switch val := cur.Value.(type) {
case *float64:
p.Value = int(*val)
case *string:
p.Value = *val
case *FuncParam:
p.Value = *val
case *TxFilter:
p.Value = *val
case *NotificationFilter:
p.Value = *val
case *ExecutionFilter:
if (*val).State == "HALT" || (*val).State == "FAULT" {
p.Value = *val
} else {
continue
}
case *[]Param:
p.Value = *val
}
return nil return nil
} }
var ps []Param
if err := json.Unmarshal(data, &ps); err == nil {
p.Type = ArrayT
p.Value = ps
return nil
} }
return errors.New("unknown type") return errors.New("unknown type")

View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -13,7 +14,12 @@ import (
) )
func TestParam_UnmarshalJSON(t *testing.T) { func TestParam_UnmarshalJSON(t *testing.T) {
msg := `["str1", 123, ["str2", 3], [{"type": "String", "value": "jajaja"}]]` msg := `["str1", 123, ["str2", 3], [{"type": "String", "value": "jajaja"}],
{"type": "MinerTransaction"},
{"contract": "f84d6a337fbc3d3a201d41da99e86b479e7a2554"},
{"state": "HALT"}]`
contr, err := util.Uint160DecodeStringLE("f84d6a337fbc3d3a201d41da99e86b479e7a2554")
require.NoError(t, err)
expected := Params{ expected := Params{
{ {
Type: StringT, Type: StringT,
@ -51,6 +57,18 @@ func TestParam_UnmarshalJSON(t *testing.T) {
}, },
}, },
}, },
{
Type: TxFilterT,
Value: TxFilter{Type: transaction.MinerType},
},
{
Type: NotificationFilterT,
Value: NotificationFilter{Contract: contr},
},
{
Type: ExecutionFilterT,
Value: ExecutionFilter{State: "HALT"},
},
} }
var ps Params var ps Params

View file

@ -23,6 +23,8 @@ const (
NotificationEventID NotificationEventID
// ExecutionEventID is used for `transaction_executed` events. // ExecutionEventID is used for `transaction_executed` events.
ExecutionEventID ExecutionEventID
// MissedEventID notifies user of missed events.
MissedEventID EventID = 255
) )
// String is a good old Stringer implementation. // String is a good old Stringer implementation.
@ -36,6 +38,8 @@ func (e EventID) String() string {
return "notification_from_execution" return "notification_from_execution"
case ExecutionEventID: case ExecutionEventID:
return "transaction_executed" return "transaction_executed"
case MissedEventID:
return "event_missed"
default: default:
return "unknown" return "unknown"
} }
@ -52,6 +56,8 @@ func GetEventIDFromString(s string) (EventID, error) {
return NotificationEventID, nil return NotificationEventID, nil
case "transaction_executed": case "transaction_executed":
return ExecutionEventID, nil return ExecutionEventID, nil
case "event_missed":
return MissedEventID, nil
default: default:
return 255, errors.New("invalid stream name") return 255, errors.New("invalid stream name")
} }

View file

@ -3,12 +3,10 @@ package result
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
) )
@ -30,21 +28,16 @@ type (
// Block wrapper used for the representation of // Block wrapper used for the representation of
// block.Block / block.Base on the RPC Server. // block.Block / block.Base on the RPC Server.
Block struct { Block struct {
Hash util.Uint256 `json:"hash"` *block.Base
BlockMetadataAndTx
}
// BlockMetadataAndTx is an additional metadata added to standard
// block.Base plus specially encoded transactions.
BlockMetadataAndTx struct {
Size int `json:"size"` Size int `json:"size"`
Version uint32 `json:"version"`
NextBlockHash *util.Uint256 `json:"nextblockhash,omitempty"` NextBlockHash *util.Uint256 `json:"nextblockhash,omitempty"`
PreviousBlockHash util.Uint256 `json:"previousblockhash"`
MerkleRoot util.Uint256 `json:"merkleroot"`
Time uint32 `json:"time"`
Index uint32 `json:"index"`
Nonce string `json:"nonce"`
NextConsensus string `json:"nextconsensus"`
Confirmations uint32 `json:"confirmations"` Confirmations uint32 `json:"confirmations"`
Script transaction.Witness `json:"script"`
Tx []Tx `json:"tx"` Tx []Tx `json:"tx"`
} }
) )
@ -52,20 +45,12 @@ type (
// NewBlock creates a new Block wrapper. // NewBlock creates a new Block wrapper.
func NewBlock(b *block.Block, chain core.Blockchainer) Block { func NewBlock(b *block.Block, chain core.Blockchainer) Block {
res := Block{ res := Block{
Version: b.Version, Base: &b.Base,
Hash: b.Hash(), BlockMetadataAndTx: BlockMetadataAndTx{
Size: io.GetVarSize(b), Size: io.GetVarSize(b),
PreviousBlockHash: b.PrevHash,
MerkleRoot: b.MerkleRoot,
Time: b.Timestamp,
Index: b.Index,
Nonce: fmt.Sprintf("%016x", b.ConsensusData),
NextConsensus: address.Uint160ToString(b.NextConsensus),
Confirmations: chain.BlockHeight() - b.Index - 1, Confirmations: chain.BlockHeight() - b.Index - 1,
Script: b.Script,
Tx: make([]Tx, 0, len(b.Transactions)), Tx: make([]Tx, 0, len(b.Transactions)),
},
} }
hash := chain.GetHeaderHash(int(b.Index) + 1) hash := chain.GetHeaderHash(int(b.Index) + 1)
@ -130,3 +115,44 @@ func (t *Tx) UnmarshalJSON(data []byte) error {
t.Transaction = transaction t.Transaction = transaction
return nil return nil
} }
// MarshalJSON implements json.Marshaler interface.
func (b Block) MarshalJSON() ([]byte, error) {
output, err := json.Marshal(b.BlockMetadataAndTx)
if err != nil {
return nil, err
}
baseBytes, err := json.Marshal(b.Base)
if err != nil {
return nil, err
}
// We have to keep both "fields" at the same level in json in order to
// match C# API, so there's no way to marshall Block correctly with
// standard json.Marshaller tool.
if output[len(output)-1] != '}' || baseBytes[0] != '{' {
return nil, errors.New("can't merge internal jsons")
}
output[len(output)-1] = ','
output = append(output, baseBytes[1:]...)
return output, nil
}
// UnmarshalJSON implements json.Unmarshaler interface.
func (b *Block) UnmarshalJSON(data []byte) error {
// As block.Base and BlockMetadataAndTx are at the same level in json,
// do unmarshalling separately for both structs.
metaTx := new(BlockMetadataAndTx)
base := new(block.Base)
err := json.Unmarshal(data, metaTx)
if err != nil {
return err
}
err = json.Unmarshal(data, base)
if err != nil {
return err
}
b.Base = base
b.BlockMetadataAndTx = *metaTx
return nil
}

View file

@ -295,37 +295,49 @@ func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Raw {
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw, subChan <-chan *websocket.PreparedMessage) { func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw, subChan <-chan *websocket.PreparedMessage) {
pingTicker := time.NewTicker(wsPingPeriod) pingTicker := time.NewTicker(wsPingPeriod)
defer ws.Close() eventloop:
defer pingTicker.Stop()
for { for {
select { select {
case <-s.shutdown: case <-s.shutdown:
// Signal to the reader routine. break eventloop
ws.Close()
return
case event, ok := <-subChan: case event, ok := <-subChan:
if !ok { if !ok {
return break eventloop
} }
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WritePreparedMessage(event); err != nil { if err := ws.WritePreparedMessage(event); err != nil {
return break eventloop
} }
case res, ok := <-resChan: case res, ok := <-resChan:
if !ok { if !ok {
return break eventloop
} }
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WriteJSON(res); err != nil { if err := ws.WriteJSON(res); err != nil {
return break eventloop
} }
case <-pingTicker.C: case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return break eventloop
} }
} }
} }
ws.Close()
pingTicker.Stop()
// Drain notification channel as there might be some goroutines blocked
// on it.
drainloop:
for {
select {
case _, ok := <-subChan:
if !ok {
break drainloop
}
default:
break drainloop
}
}
} }
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw, subscr *subscriber) { func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw, subscr *subscriber) {
@ -353,8 +365,8 @@ requestloop:
s.subsLock.Lock() s.subsLock.Lock()
delete(s.subscribers, subscr) delete(s.subscribers, subscr)
for _, e := range subscr.feeds { for _, e := range subscr.feeds {
if e != response.InvalidEventID { if e.event != response.InvalidEventID {
s.unsubscribeFromChannel(e) s.unsubscribeFromChannel(e.event)
} }
} }
s.subsLock.Unlock() s.subsLock.Unlock()
@ -1131,9 +1143,35 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
return nil, response.ErrInvalidParams return nil, response.ErrInvalidParams
} }
event, err := response.GetEventIDFromString(streamName) event, err := response.GetEventIDFromString(streamName)
if err != nil { if err != nil || event == response.MissedEventID {
return nil, response.ErrInvalidParams return nil, response.ErrInvalidParams
} }
// Optional filter.
var filter interface{}
p, ok = reqParams.Value(1)
if ok {
// It doesn't accept filters.
if event == response.BlockEventID {
return nil, response.ErrInvalidParams
}
switch event {
case response.TransactionEventID:
if p.Type != request.TxFilterT {
return nil, response.ErrInvalidParams
}
case response.NotificationEventID:
if p.Type != request.NotificationFilterT {
return nil, response.ErrInvalidParams
}
case response.ExecutionEventID:
if p.Type != request.ExecutionFilterT {
return nil, response.ErrInvalidParams
}
}
filter = p.Value
}
s.subsLock.Lock() s.subsLock.Lock()
defer s.subsLock.Unlock() defer s.subsLock.Unlock()
select { select {
@ -1143,14 +1181,15 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
} }
var id int var id int
for ; id < len(sub.feeds); id++ { for ; id < len(sub.feeds); id++ {
if sub.feeds[id] == response.InvalidEventID { if sub.feeds[id].event == response.InvalidEventID {
break break
} }
} }
if id == len(sub.feeds) { if id == len(sub.feeds) {
return nil, response.NewInternalServerError("maximum number of subscriptions is reached", nil) return nil, response.NewInternalServerError("maximum number of subscriptions is reached", nil)
} }
sub.feeds[id] = event sub.feeds[id].event = event
sub.feeds[id].filter = filter
s.subscribeToChannel(event) s.subscribeToChannel(event)
return strconv.FormatInt(int64(id), 10), nil return strconv.FormatInt(int64(id), 10), nil
} }
@ -1195,11 +1234,12 @@ func (s *Server) unsubscribe(reqParams request.Params, sub *subscriber) (interfa
} }
s.subsLock.Lock() s.subsLock.Lock()
defer s.subsLock.Unlock() defer s.subsLock.Unlock()
if len(sub.feeds) <= id || sub.feeds[id] == response.InvalidEventID { if len(sub.feeds) <= id || sub.feeds[id].event == response.InvalidEventID {
return nil, response.ErrInvalidParams return nil, response.ErrInvalidParams
} }
event := sub.feeds[id] event := sub.feeds[id].event
sub.feeds[id] = response.InvalidEventID sub.feeds[id].event = response.InvalidEventID
sub.feeds[id].filter = nil
s.unsubscribeFromChannel(event) s.unsubscribeFromChannel(event)
return true, nil return true, nil
} }
@ -1233,6 +1273,20 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) {
} }
func (s *Server) handleSubEvents() { func (s *Server) handleSubEvents() {
b, err := json.Marshal(response.Notification{
JSONRPC: request.JSONRPCVersion,
Event: response.MissedEventID,
Payload: make([]interface{}, 0),
})
if err != nil {
s.log.Error("fatal: failed to marshal overflow event", zap.Error(err))
return
}
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, b)
if err != nil {
s.log.Error("fatal: failed to prepare overflow message", zap.Error(err))
return
}
chloop: chloop:
for { for {
var resp = response.Notification{ var resp = response.Notification{
@ -1259,10 +1313,13 @@ chloop:
s.subsLock.RLock() s.subsLock.RLock()
subloop: subloop:
for sub := range s.subscribers { for sub := range s.subscribers {
for _, subID := range sub.feeds { if sub.overflown.Load() {
if subID == resp.Event { continue
}
for i := range sub.feeds {
if sub.feeds[i].Matches(&resp) {
if msg == nil { if msg == nil {
b, err := json.Marshal(resp) b, err = json.Marshal(resp)
if err != nil { if err != nil {
s.log.Error("failed to marshal notification", s.log.Error("failed to marshal notification",
zap.Error(err), zap.Error(err),
@ -1277,7 +1334,16 @@ chloop:
break subloop break subloop
} }
} }
sub.writer <- msg select {
case sub.writer <- msg:
default:
sub.overflown.Store(true)
// MissedEvent is to be delivered eventually.
go func(sub *subscriber) {
sub.writer <- overflowMsg
sub.overflown.Store(false)
}(sub)
}
// The message is sent only once per subscriber. // The message is sent only once per subscriber.
break break
} }

View file

@ -336,7 +336,7 @@ var rpcTestCases = map[string][]rpcTestCase{
block, err := e.chain.GetBlock(e.chain.GetHeaderHash(2)) block, err := e.chain.GetBlock(e.chain.GetHeaderHash(2))
require.NoErrorf(t, err, "could not get block") require.NoErrorf(t, err, "could not get block")
assert.Equal(t, block.Hash(), res.Hash) assert.Equal(t, block.Hash(), res.Hash())
for i := range res.Tx { for i := range res.Tx {
tx := res.Tx[i] tx := res.Tx[i]
require.Equal(t, transaction.MinerType, tx.Transaction.Type) require.Equal(t, transaction.MinerType, tx.Transaction.Type)
@ -1035,6 +1035,7 @@ func checkErrGetResult(t *testing.T, body []byte, expectingFail bool) json.RawMe
err := json.Unmarshal(body, &resp) err := json.Unmarshal(body, &resp)
require.Nil(t, err) require.Nil(t, err)
if expectingFail { if expectingFail {
require.NotNil(t, resp.Error)
assert.NotEqual(t, 0, resp.Error.Code) assert.NotEqual(t, 0, resp.Error.Code)
assert.NotEqual(t, "", resp.Error.Message) assert.NotEqual(t, "", resp.Error.Message)
} else { } else {

View file

@ -2,7 +2,11 @@ package server
import ( import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result"
"go.uber.org/atomic"
) )
type ( type (
@ -10,12 +14,16 @@ type (
subscriber struct { subscriber struct {
writer chan<- *websocket.PreparedMessage writer chan<- *websocket.PreparedMessage
ws *websocket.Conn ws *websocket.Conn
overflown atomic.Bool
// These work like slots as there is not a lot of them (it's // These work like slots as there is not a lot of them (it's
// cheaper doing it this way rather than creating a map), // cheaper doing it this way rather than creating a map),
// pointing to EventID is an obvious overkill at the moment, but // pointing to EventID is an obvious overkill at the moment, but
// that's not for long. // that's not for long.
feeds [maxFeeds]response.EventID feeds [maxFeeds]feed
}
feed struct {
event response.EventID
filter interface{}
} }
) )
@ -33,3 +41,27 @@ const (
// a lot in terms of memory used. // a lot in terms of memory used.
notificationBufSize = 1024 notificationBufSize = 1024
) )
func (f *feed) Matches(r *response.Notification) bool {
if r.Event != f.event {
return false
}
if f.filter == nil {
return true
}
switch f.event {
case response.TransactionEventID:
filt := f.filter.(request.TxFilter)
tx := r.Payload[0].(*transaction.Transaction)
return tx.Type == filt.Type
case response.NotificationEventID:
filt := f.filter.(request.NotificationFilter)
notification := r.Payload[0].(result.NotificationEvent)
return notification.Contract.Equals(filt.Contract)
case response.ExecutionEventID:
filt := f.filter.(request.ExecutionFilter)
applog := r.Payload[0].(result.ApplicationLog)
return len(applog.Executions) != 0 && applog.Executions[0].VMState == filt.State
}
return false
}

View file

@ -62,6 +62,24 @@ func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *webso
return chain, rpcSrv, ws, respMsgs, finishedFlag return chain, rpcSrv, ws, respMsgs, finishedFlag
} }
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
var s string
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "subscribe","params": %s,"id": 1}`, params), msgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &s))
return s
}
func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id string) {
var b bool
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "unsubscribe","params": ["%s"],"id": 1}`, id), msgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &b))
require.Equal(t, true, b)
}
func TestSubscriptions(t *testing.T) { func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0) var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed"} var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed"}
@ -72,16 +90,7 @@ func TestSubscriptions(t *testing.T) {
defer rpcSrv.Shutdown() defer rpcSrv.Shutdown()
for _, feed := range subFeeds { for _, feed := range subFeeds {
var s string s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))
resp := callWSGetRaw(t, c, fmt.Sprintf(`{
"jsonrpc": "2.0",
"method": "subscribe",
"params": ["%s"],
"id": 1
}`, feed), respMsgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &s))
subIDs = append(subIDs, s) subIDs = append(subIDs, s)
} }
@ -109,23 +118,104 @@ func TestSubscriptions(t *testing.T) {
} }
for _, id := range subIDs { for _, id := range subIDs {
var b bool callUnsubscribe(t, c, respMsgs, id)
resp := callWSGetRaw(t, c, fmt.Sprintf(`{
"jsonrpc": "2.0",
"method": "unsubscribe",
"params": ["%s"],
"id": 1
}`, id), respMsgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &b))
require.Equal(t, true, b)
} }
finishedFlag.CAS(false, true) finishedFlag.CAS(false, true)
c.Close() c.Close()
} }
func TestFilteredSubscriptions(t *testing.T) {
var cases = map[string]struct {
params string
check func(*testing.T, *response.Notification)
}{
"tx matching": {
params: `["transaction_added", {"type":"InvocationTransaction"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.TransactionEventID, resp.Event)
typ := rmap["type"].(string)
require.Equal(t, "InvocationTransaction", typ)
},
},
"notification matching": {
params: `["notification_from_execution", {"contract":"` + testContractHash + `"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.NotificationEventID, resp.Event)
c := rmap["contract"].(string)
require.Equal(t, "0x"+testContractHash, c)
},
},
"execution matching": {
params: `["transaction_executed", {"state":"HALT"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.ExecutionEventID, resp.Event)
execs := rmap["executions"].([]interface{})
exec0 := execs[0].(map[string]interface{})
st := exec0["vmstate"].(string)
require.Equal(t, "HALT", st)
},
},
"tx non-matching": {
params: `["transaction_added", {"type":"EnrollmentTransaction"}]`,
check: func(t *testing.T, _ *response.Notification) {
t.Fatal("unexpected match for EnrollmentTransaction")
},
},
"notification non-matching": {
params: `["notification_from_execution", {"contract":"00112233445566778899aabbccddeeff00112233"}]`,
check: func(t *testing.T, _ *response.Notification) {
t.Fatal("unexpected match for contract 00112233445566778899aabbccddeeff00112233")
},
},
"execution non-matching": {
params: `["transaction_executed", {"state":"FAULT"}]`,
check: func(t *testing.T, _ *response.Notification) {
t.Fatal("unexpected match for faulted execution")
},
},
}
for name, this := range cases {
t.Run(name, func(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
// It's used as an end-of-event-stream, so it's always present.
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
subID := callSubscribe(t, c, respMsgs, this.params)
var lastBlock uint32
for _, b := range getTestBlocks(t) {
require.NoError(t, chain.AddBlock(b))
lastBlock = b.Index
}
for {
resp := getNotification(t, respMsgs)
rmap := resp.Payload[0].(map[string]interface{})
if resp.Event == response.BlockEventID {
index := rmap["index"].(float64)
if uint32(index) == lastBlock {
break
}
continue
}
this.check(t, resp)
}
callUnsubscribe(t, c, respMsgs, subID)
callUnsubscribe(t, c, respMsgs, blockSubID)
finishedFlag.CAS(false, true)
c.Close()
})
}
}
func TestMaxSubscriptions(t *testing.T) { func TestMaxSubscriptions(t *testing.T) {
var subIDs = make([]string, 0) var subIDs = make([]string, 0)
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
@ -160,6 +250,13 @@ func TestBadSubUnsub(t *testing.T) {
"no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`, "no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,
"bad (non-string) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": [1], "id": 1}`, "bad (non-string) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": [1], "id": 1}`,
"bad (wrong) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_removed"], "id": 1}`, "bad (wrong) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_removed"], "id": 1}`,
"missed event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["event_missed"], "id": 1}`,
"block invalid filter": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added", 1], "id": 1}`,
"tx filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", 1], "id": 1}`,
"tx filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", {"state": "HALT"}], "id": 1}`,
"notification filter": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["notification_from_execution", "contract"], "id": 1}`,
"execution filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", "FAULT"], "id": 1}`,
"execution filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", {"state": "STOP"}], "id": 1}`,
} }
var unsubCases = map[string]string{ var unsubCases = map[string]string{
"no params": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": [], "id": 1}`, "no params": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": [], "id": 1}`,