RCP server (#50)

* Adds basic RPC supporting files

* Adds interrupt handling and error chan

* Add getblock RPC method

* Update request structure

* Update names of nodes

* Allow bad addresses to be registered in discovery externally

* Small tidy up

* Few tweaks

* Check if error is close error in tcp transport

* Fix tests

* Fix priv port

* Small tweak to param name

* Comment fix

* Remove version from server

* Moves submitblock to TODO block

* Remove old field

* Bumps version and fix hex issues
This commit is contained in:
Steven Jack 2018-03-23 20:36:59 +00:00 committed by Anthony De Meulemeester
parent 52fa41a12a
commit 19a430b262
21 changed files with 734 additions and 55 deletions

36
Gopkg.lock generated
View file

@ -5,7 +5,7 @@
branch = "master" branch = "master"
name = "github.com/anthdm/neo-go" name = "github.com/anthdm/neo-go"
packages = ["pkg/util"] packages = ["pkg/util"]
revision = "da01cdae5c15dcf7d6a046b27d2710c95e2cc66a" revision = "9a605513fe8c5250c0ec71b30f9ecad49bd56c0a"
[[projects]] [[projects]]
branch = "master" branch = "master"
@ -19,6 +19,24 @@
revision = "346938d642f2ec3594ed81d874461961cd0faa76" revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0" version = "v1.1.0"
[[projects]]
name = "github.com/go-kit/kit"
packages = ["log"]
revision = "4dc7be5d2d12881735283bcab7352178e190fc71"
version = "v0.6.0"
[[projects]]
name = "github.com/go-logfmt/logfmt"
packages = ["."]
revision = "390ab7935ee28ec6b286364bba9b4dd6410cb3d5"
version = "v0.3.0"
[[projects]]
name = "github.com/go-stack/stack"
packages = ["."]
revision = "259ab82a6cad3992b4e21ff5cac294ccb06474bc"
version = "v1.7.0"
[[projects]] [[projects]]
name = "github.com/go-yaml/yaml" name = "github.com/go-yaml/yaml"
packages = ["."] packages = ["."]
@ -31,6 +49,12 @@
packages = ["."] packages = ["."]
revision = "553a641470496b2327abcac10b36396bd98e45c9" revision = "553a641470496b2327abcac10b36396bd98e45c9"
[[projects]]
branch = "master"
name = "github.com/kr/logfmt"
packages = ["."]
revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0"
[[projects]] [[projects]]
name = "github.com/pkg/errors" name = "github.com/pkg/errors"
packages = ["."] packages = ["."]
@ -72,7 +96,7 @@
"leveldb/table", "leveldb/table",
"leveldb/util" "leveldb/util"
] ]
revision = "211f780988068502fe874c44dae530528ebd840f" revision = "169b1b37be738edb2813dab48c97a549bcf99bb5"
[[projects]] [[projects]]
name = "github.com/urfave/cli" name = "github.com/urfave/cli"
@ -89,7 +113,7 @@
"scrypt", "scrypt",
"ssh/terminal" "ssh/terminal"
] ]
revision = "8c653846df49742c4c85ec37e5d9f8d3ba657895" revision = "374053ea96cb300f8671b8d3b07edeeb06e203b4"
[[projects]] [[projects]]
branch = "master" branch = "master"
@ -98,7 +122,7 @@
"unix", "unix",
"windows" "windows"
] ]
revision = "c28acc882ebcbfbe8ce9f0f14b9ac26ee138dd51" revision = "2f1e207ee39ff70f3433e49c6eb52677a515e3b5"
[[projects]] [[projects]]
name = "golang.org/x/text" name = "golang.org/x/text"
@ -121,11 +145,11 @@
"go/buildutil", "go/buildutil",
"go/loader" "go/loader"
] ]
revision = "73e16cff9e0d4a802937444bebb562458548241d" revision = "96caea41033df6f8c3974c845ab094f8ec3bd345"
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "83630d732c34b1ddf24cd34025fd9fdd982a5e5075ec93a450e7edada658c6c9" inputs-digest = "d4338e14e8103a6626ecf662f3d0c08e972a39e667a6c76f31cc8938f59f2cba"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View file

@ -1 +1 @@
0.32.0 0.33.0

View file

@ -2,11 +2,15 @@ package server
import ( import (
"fmt" "fmt"
"os"
"os/signal"
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/network" "github.com/CityOfZion/neo-go/pkg/network"
"github.com/CityOfZion/neo-go/pkg/rpc"
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
@ -43,6 +47,9 @@ func startServer(ctx *cli.Context) error {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt)
serverConfig := network.NewServerConfig(config) serverConfig := network.NewServerConfig(config)
chain, err := newBlockchain(net, config.ApplicationConfiguration.DataDirectoryPath) chain, err := newBlockchain(net, config.ApplicationConfiguration.DataDirectoryPath)
if err != nil { if err != nil {
@ -55,7 +62,34 @@ func startServer(ctx *cli.Context) error {
} }
fmt.Println(logo()) fmt.Println(logo())
network.NewServer(serverConfig, chain).Start() server := network.NewServer(serverConfig, chain)
rpcServer := rpc.NewServer(chain, config.ApplicationConfiguration.RPCPort, server)
errChan := make(chan error)
go server.Start(errChan)
go rpcServer.Start(errChan)
var shutdownErr error
Main:
for {
select {
case err := <-errChan:
shutdownErr = errors.Wrap(err, "Error encountered by server")
interruptChan <- os.Kill
case <-interruptChan:
server.Shutdown()
if serverErr := rpcServer.Shutdown(); serverErr != nil {
shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server")
}
break Main
}
}
if shutdownErr != nil {
return cli.NewExitError(shutdownErr, 1)
}
return nil return nil
} }

View file

@ -16,10 +16,10 @@ type Block struct {
BlockBase BlockBase
// Transaction list. // Transaction list.
Transactions []*transaction.Transaction Transactions []*transaction.Transaction `json:"tx"`
// True if this block is created from trimmed data. // True if this block is created from trimmed data.
Trimmed bool Trimmed bool `json:"-"`
} }
// Header returns the Header of the Block. // Header returns the Header of the Block.

View file

@ -14,33 +14,33 @@ import (
// BlockBase holds the base info of a block // BlockBase holds the base info of a block
type BlockBase struct { type BlockBase struct {
// Version of the block. // Version of the block.
Version uint32 Version uint32 `json:"version"`
// hash of the previous block. // hash of the previous block.
PrevHash util.Uint256 PrevHash util.Uint256 `json:"previousblockhash"`
// Root hash of a transaction list. // Root hash of a transaction list.
MerkleRoot util.Uint256 MerkleRoot util.Uint256 `json:"merkleroot"`
// The time stamp of each block must be later than previous block's time stamp. // The time stamp of each block must be later than previous block's time stamp.
// Generally the difference of two block's time stamp is about 15 seconds and imprecision is allowed. // Generally the difference of two block's time stamp is about 15 seconds and imprecision is allowed.
// The height of the block must be exactly equal to the height of the previous block plus 1. // The height of the block must be exactly equal to the height of the previous block plus 1.
Timestamp uint32 Timestamp uint32 `json:"time"`
// index/height of the block // index/height of the block
Index uint32 Index uint32 `json:"height"`
// Random number also called nonce // Random number also called nonce
ConsensusData uint64 ConsensusData uint64 `json:"nonce"`
// Contract addresss of the next miner // Contract addresss of the next miner
NextConsensus util.Uint160 NextConsensus util.Uint160 `json:"nextminer"`
// Padding that is fixed to 1 // Padding that is fixed to 1
_ uint8 _ uint8
// Script used to validate the block // Script used to validate the block
Script *transaction.Witness Script *transaction.Witness `json:"script"`
// hash of this block, created when binary encoded. // hash of this block, created when binary encoded.
hash util.Uint256 hash util.Uint256

View file

@ -9,6 +9,7 @@ type Blockchainer interface {
AddBlock(*Block) error AddBlock(*Block) error
BlockHeight() uint32 BlockHeight() uint32
HeaderHeight() uint32 HeaderHeight() uint32
GetBlock(hash util.Uint256) (*Block, error)
GetHeaderHash(int) util.Uint256 GetHeaderHash(int) util.Uint256
CurrentHeaderHash() util.Uint256 CurrentHeaderHash() util.Uint256
CurrentBlockHash() util.Uint256 CurrentBlockHash() util.Uint256

View file

@ -13,35 +13,35 @@ import (
// Transaction is a process recorded in the NEO blockchain. // Transaction is a process recorded in the NEO blockchain.
type Transaction struct { type Transaction struct {
// The type of the transaction. // The type of the transaction.
Type TXType Type TXType `json:"type"`
// The trading version which is currently 0. // The trading version which is currently 0.
Version uint8 Version uint8 `json:"-"`
// Data specific to the type of the transaction. // Data specific to the type of the transaction.
// This is always a pointer to a <Type>Transaction. // This is always a pointer to a <Type>Transaction.
Data TXer Data TXer `json:"-"`
// Transaction attributes. // Transaction attributes.
Attributes []*Attribute Attributes []*Attribute `json:"attributes"`
// The inputs of the transaction. // The inputs of the transaction.
Inputs []*Input Inputs []*Input `json:"vin"`
// The outputs of the transaction. // The outputs of the transaction.
Outputs []*Output Outputs []*Output `json:"vout"`
// The scripts that comes with this transaction. // The scripts that comes with this transaction.
// Scripts exist out of the verification script // Scripts exist out of the verification script
// and invocation script. // and invocation script.
Scripts []*Witness Scripts []*Witness `json:"scripts"`
// hash of the transaction // hash of the transaction
hash util.Uint256 hash util.Uint256
// Trimmed indicates this is a transaction from trimmed // Trimmed indicates this is a transaction from trimmed
// data. // data.
Trimmed bool Trimmed bool `json:"-"`
} }
// NewTrimmedTX returns a trimmed transaction with only its hash // NewTrimmedTX returns a trimmed transaction with only its hash

View file

@ -9,8 +9,8 @@ import (
// Witness contains 2 scripts. // Witness contains 2 scripts.
type Witness struct { type Witness struct {
InvocationScript []byte InvocationScript []byte `json:"stack"`
VerificationScript []byte VerificationScript []byte `json:"redeem"`
} }
// DecodeBinary implements the payload interface. // DecodeBinary implements the payload interface.

View file

@ -14,16 +14,22 @@ type Discoverer interface {
BackFill(...string) BackFill(...string)
PoolCount() int PoolCount() int
RequestRemote(int) RequestRemote(int)
RegisterBadAddr(string)
UnconnectedPeers() []string
BadPeers() []string
} }
// DefaultDiscovery // DefaultDiscovery default implementation of the Discoverer interface.
type DefaultDiscovery struct { type DefaultDiscovery struct {
transport Transporter transport Transporter
dialTimeout time.Duration dialTimeout time.Duration
addrs map[string]bool addrs map[string]bool
badAddrs map[string]bool badAddrs map[string]bool
unconnectedAddrs map[string]bool
requestCh chan int requestCh chan int
connectedCh chan string
backFill chan string backFill chan string
badAddrCh chan string
pool chan string pool chan string
} }
@ -34,8 +40,11 @@ func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
dialTimeout: dt, dialTimeout: dt,
addrs: make(map[string]bool), addrs: make(map[string]bool),
badAddrs: make(map[string]bool), badAddrs: make(map[string]bool),
unconnectedAddrs: make(map[string]bool),
requestCh: make(chan int), requestCh: make(chan int),
connectedCh: make(chan string),
backFill: make(chan string), backFill: make(chan string),
badAddrCh: make(chan string),
pool: make(chan string, maxPoolSize), pool: make(chan string, maxPoolSize),
} }
go d.run() go d.run()
@ -58,16 +67,42 @@ func (d *DefaultDiscovery) PoolCount() int {
return len(d.pool) return len(d.pool)
} }
// Request will try to establish a connection with n nodes. // RequestRemote will try to establish a connection with n nodes.
func (d *DefaultDiscovery) RequestRemote(n int) { func (d *DefaultDiscovery) RequestRemote(n int) {
d.requestCh <- n d.requestCh <- n
} }
func (d *DefaultDiscovery) work(addrCh, badAddrCh chan string) { // RegisterBadAddr registers the given address as a bad address.
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
d.badAddrCh <- addr
d.RequestRemote(1)
}
// UnconnectedPeers returns all addresses of unconnected addrs.
func (d *DefaultDiscovery) UnconnectedPeers() []string {
var addrs []string
for addr := range d.unconnectedAddrs {
addrs = append(addrs, addr)
}
return addrs
}
// BadPeers returns all addresses of bad addrs.
func (d *DefaultDiscovery) BadPeers() []string {
var addrs []string
for addr := range d.badAddrs {
addrs = append(addrs, addr)
}
return addrs
}
func (d *DefaultDiscovery) work(addrCh chan string) {
for { for {
addr := <-addrCh addr := <-addrCh
if err := d.transport.Dial(addr, d.dialTimeout); err != nil { if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
badAddrCh <- addr d.badAddrCh <- addr
} else {
d.connectedCh <- addr
} }
} }
} }
@ -79,12 +114,11 @@ func (d *DefaultDiscovery) next() string {
func (d *DefaultDiscovery) run() { func (d *DefaultDiscovery) run() {
var ( var (
maxWorkers = 5 maxWorkers = 5
badAddrCh = make(chan string)
workCh = make(chan string) workCh = make(chan string)
) )
for i := 0; i < maxWorkers; i++ { for i := 0; i < maxWorkers; i++ {
go d.work(workCh, badAddrCh) go d.work(workCh)
} }
for { for {
@ -95,6 +129,7 @@ func (d *DefaultDiscovery) run() {
} }
if _, ok := d.addrs[addr]; !ok { if _, ok := d.addrs[addr]; !ok {
d.addrs[addr] = true d.addrs[addr] = true
d.unconnectedAddrs[addr] = true
d.pool <- addr d.pool <- addr
} }
case n := <-d.requestCh: case n := <-d.requestCh:
@ -103,11 +138,15 @@ func (d *DefaultDiscovery) run() {
workCh <- d.next() workCh <- d.next()
} }
}() }()
case addr := <-badAddrCh: case addr := <-d.badAddrCh:
d.badAddrs[addr] = true d.badAddrs[addr] = true
delete(d.unconnectedAddrs, addr)
go func() { go func() {
workCh <- d.next() workCh <- d.next()
}() }()
case addr := <-d.connectedCh:
delete(d.unconnectedAddrs, addr)
} }
} }
} }

View file

@ -23,6 +23,9 @@ func (chain testChain) BlockHeight() uint32 {
func (chain testChain) HeaderHeight() uint32 { func (chain testChain) HeaderHeight() uint32 {
return 0 return 0
} }
func (chain testChain) GetBlock(hash util.Uint256) (*core.Block, error) {
return nil, nil
}
func (chain testChain) GetHeaderHash(int) util.Uint256 { func (chain testChain) GetHeaderHash(int) util.Uint256 {
return util.Uint256{} return util.Uint256{}
} }
@ -43,7 +46,10 @@ type testDiscovery struct{}
func (d testDiscovery) BackFill(addrs ...string) {} func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) PoolCount() int { return 0 } func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
func (d testDiscovery) RequestRemote(n int) {} func (d testDiscovery) RequestRemote(n int) {}
func (d testDiscovery) BadPeers() []string { return []string{} }
type localTransport struct{} type localTransport struct{}

View file

@ -34,7 +34,7 @@ type (
// ServerConfig holds the Server configuration. // ServerConfig holds the Server configuration.
ServerConfig ServerConfig
// id also known as the nonce of te server. // id also known as the nonce of the server.
id uint32 id uint32
transport Transporter transport Transporter
@ -84,8 +84,13 @@ func NewServer(config ServerConfig, chain *core.Blockchain) *Server {
return s return s
} }
// ID returns the servers ID.
func (s *Server) ID() uint32 {
return s.id
}
// Start will start the server and its underlying transport. // Start will start the server and its underlying transport.
func (s *Server) Start() { func (s *Server) Start(errChan chan error) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"blockHeight": s.chain.BlockHeight(), "blockHeight": s.chain.BlockHeight(),
"headerHeight": s.chain.HeaderHeight(), "headerHeight": s.chain.HeaderHeight(),
@ -96,6 +101,22 @@ func (s *Server) Start() {
s.run() s.run()
} }
// Shutdown disconnects all peers and stops listening.
func (s *Server) Shutdown() {
log.WithFields(log.Fields{
"peers": s.PeerCount(),
}).Info("shutting down server")
close(s.quit)
}
func (s *Server) UnconnectedPeers() []string {
return s.discovery.UnconnectedPeers()
}
func (s *Server) BadPeers() []string {
return s.discovery.BadPeers()
}
func (s *Server) run() { func (s *Server) run() {
// Ask discovery to connect with remote nodes to fill up // Ask discovery to connect with remote nodes to fill up
// the server minimum peer slots. // the server minimum peer slots.
@ -130,7 +151,7 @@ func (s *Server) run() {
"endpoint": p.Endpoint(), "endpoint": p.Endpoint(),
}).Info("new peer connected") }).Info("new peer connected")
case drop := <-s.unregister: case drop := <-s.unregister:
s.discovery.RequestRemote(1) s.discovery.RegisterBadAddr(drop.peer.Endpoint().String())
delete(s.peers, drop.peer) delete(s.peers, drop.peer)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"endpoint": drop.peer.Endpoint(), "endpoint": drop.peer.Endpoint(),
@ -141,6 +162,12 @@ func (s *Server) run() {
} }
} }
// Peers returns the current list of peers connected to
// the server.
func (s *Server) Peers() map[Peer]bool {
return s.peers
}
// PeerCount returns the number of current connected peers. // PeerCount returns the number of current connected peers.
func (s *Server) PeerCount() int { func (s *Server) PeerCount() int {
s.lock.RLock() s.lock.RLock()

View file

@ -2,6 +2,7 @@ package network
import ( import (
"net" "net"
"regexp"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -54,12 +55,31 @@ func (t *TCPTransport) Accept() {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
log.Warnf("TCP accept error: %s", err) log.Warnf("TCP accept error: %s", err)
if t.isCloseError(err) {
break
}
continue continue
} }
go t.handleConn(conn) go t.handleConn(conn)
} }
} }
func (t *TCPTransport) isCloseError(err error) bool {
regex, err := regexp.Compile(".* use of closed network connection")
if err != nil {
return false
}
if opErr, ok := err.(*net.OpError); ok {
if regex.Match([]byte(opErr.Error())) {
return true
}
}
return false
}
func (t *TCPTransport) handleConn(conn net.Conn) { func (t *TCPTransport) handleConn(conn net.Conn) {
p := NewTCPPeer(conn, t.proto) p := NewTCPPeer(conn, t.proto)
t.server.register <- p t.server.register <- p

64
pkg/rpc/errors.go Normal file
View file

@ -0,0 +1,64 @@
package rpc
import (
"fmt"
"net/http"
)
type (
// Error object for outputting JSON-RPC 2.0
// errors.
Error struct {
Code int64 `json:"code"`
HTTPCode int `json:"-"`
Cause error `json:"-"`
Message string `json:"message"`
Data string `json:"data,omitempty"`
}
)
func newError(code int64, httpCode int, message string, data string, cause error) *Error {
return &Error{
Code: code,
HTTPCode: httpCode,
Cause: cause,
Message: message,
Data: data,
}
}
// NewParseError creates a new error with code
// -32700.:%s
func NewParseError(data string, cause error) *Error {
return newError(-32700, http.StatusBadRequest, "Parse Error", data, cause)
}
// NewInvalidRequestError creates a new error with
// code -32600.
func NewInvalidRequestError(data string, cause error) *Error {
return newError(-32600, http.StatusUnprocessableEntity, "Invalid Request", data, cause)
}
// NewMethodNotFoundError creates a new error with
// code -32601.
func NewMethodNotFoundError(data string, cause error) *Error {
return newError(-32601, http.StatusMethodNotAllowed, "Method not found", data, cause)
}
// NewInvalidParamsError creates a new error with
// code -32602.
func NewInvalidParamsError(data string, cause error) *Error {
return newError(-32602, http.StatusUnprocessableEntity, "Invalid Params", data, cause)
}
// NewInternalServerError creates a new error with
// code -32603.
func NewInternalServerError(data string, cause error) *Error {
return newError(-32603, http.StatusInternalServerError, "Internal error", data, cause)
}
// Error implements the error interface.
func (e Error) Error() string {
return fmt.Sprintf("%s (%d) - %s - %s", e.Message, e.Code, e.Data, e.Cause)
}

21
pkg/rpc/param.go Normal file
View file

@ -0,0 +1,21 @@
package rpc
import (
"fmt"
)
type (
// Param represent a param either passed to
// the server or to send to a server using
// the client.
Param struct {
StringVal string
IntVal int
Type string
RawValue interface{}
}
)
func (p Param) String() string {
return fmt.Sprintf("%v", p.RawValue)
}

62
pkg/rpc/params.go Normal file
View file

@ -0,0 +1,62 @@
package rpc
import (
"encoding/json"
)
type (
// Params represent the JSON-RPC params.
Params []Param
)
// UnmarshalJSON implements the Unmarshaller
// interface.
func (p *Params) UnmarshalJSON(data []byte) error {
var params []interface{}
err := json.Unmarshal(data, &params)
if err != nil {
return err
}
for i := 0; i < len(params); i++ {
param := Param{
RawValue: params[i],
}
switch val := params[i].(type) {
case string:
param.StringVal = val
param.Type = "string"
case float64:
newVal, _ := params[i].(float64)
param.IntVal = int(newVal)
param.Type = "number"
}
*p = append(*p, param)
}
return nil
}
// ValueAt returns the param struct for the given
// index if it exists.
func (p Params) ValueAt(index int) (*Param, bool) {
if len(p) > index {
return &p[index], true
}
return nil, false
}
// ValueAtAndType returns the param struct at the given index if it
// exists and matches the given type.
func (p Params) ValueAtAndType(index int, valueType string) (*Param, bool) {
if len(p) > index && valueType == p[index].Type {
return &p[index], true
}
return nil, false
}

119
pkg/rpc/request.go Normal file
View file

@ -0,0 +1,119 @@
package rpc
import (
"encoding/json"
"fmt"
"io"
"net/http"
log "github.com/sirupsen/logrus"
)
const (
jsonRPCVersion = "2.0"
)
type (
// Request represents a standard JSON-RPC 2.0
// request: http://www.jsonrpc.org/specification#request_object.
Request struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
RawParams json.RawMessage `json:"params,omitempty"`
RawID json.RawMessage `json:"id,omitempty"`
}
// Response represents a standard JSON-RPC 2.0
// response: http://www.jsonrpc.org/specification#response_object.
Response struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error *Error `json:"error,omitempty"`
ID json.RawMessage `json:"id,omitempty"`
}
)
// NewRequest creates a new Request struct.
func NewRequest() *Request {
return &Request{
JSONRPC: jsonRPCVersion,
}
}
// DecodeData decodes the given reader into the the request
// struct.
func (r *Request) DecodeData(data io.ReadCloser) error {
defer data.Close()
err := json.NewDecoder(data).Decode(r)
if err != nil {
return fmt.Errorf("Error parsing JSON payload: %s", err)
}
if r.JSONRPC != jsonRPCVersion {
return fmt.Errorf("Invalid version, expected 2.0 got: '%s'", r.JSONRPC)
}
return nil
}
// Params takes a slice of any type and attempts to bind
// the params to it.
func (r *Request) Params() (*Params, error) {
params := Params{}
err := json.Unmarshal(r.RawParams, &params)
if err != nil {
return nil, fmt.Errorf("Error parsing params field in payload: %s", err)
}
return &params, nil
}
// WriteErrorResponse writes an error response to the ResponseWriter.
func (r Request) WriteErrorResponse(w http.ResponseWriter, err error) {
jsonErr, ok := err.(*Error)
if !ok {
jsonErr = NewInternalServerError("Internal server error", err)
}
response := Response{
JSONRPC: r.JSONRPC,
Error: jsonErr,
ID: r.RawID,
}
logFields := log.Fields{
"err": jsonErr.Cause,
"method": r.Method,
}
params, err := r.Params()
if err == nil {
logFields["params"] = *params
}
log.WithFields(logFields).Error("Error encountered with rpc request")
w.WriteHeader(jsonErr.HTTPCode)
r.writeServerResponse(w, response)
}
// WriteResponse encodes the response and writes it to the ResponseWriter.
func (r Request) WriteResponse(w http.ResponseWriter, result interface{}) {
response := Response{
JSONRPC: r.JSONRPC,
Result: result,
ID: r.RawID,
}
r.writeServerResponse(w, response)
}
func (r Request) writeServerResponse(w http.ResponseWriter, response Response) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
encoder := json.NewEncoder(w)
err := encoder.Encode(response)
if err != nil {
fmt.Println(err)
}
}

58
pkg/rpc/result/peers.go Normal file
View file

@ -0,0 +1,58 @@
package result
import (
"strings"
)
type (
// Peers payload for outputting peers in `getpeers` RPC call.
Peers struct {
Unconnected []Peer `json:"unconnected"`
Connected []Peer `json:"connected"`
Bad []Peer `json:"bad"`
}
// Peer represents the peer.
Peer struct {
Address string `json:"address"`
Port string `json:"port"`
}
)
// NewPeers creates a new Peers struct.
func NewPeers() Peers {
return Peers{
Unconnected: []Peer{},
Connected: []Peer{},
Bad: []Peer{},
}
}
// AddPeer adds a peer to the given peer type slice.
func (p *Peers) AddPeer(peerType string, addr string) {
addressParts := strings.Split(addr, ":")
peer := Peer{
Address: addressParts[0],
Port: addressParts[1],
}
switch peerType {
case "unconnected":
p.Unconnected = append(
p.Unconnected,
peer,
)
case "connected":
p.Connected = append(
p.Connected,
peer,
)
case "bad":
p.Bad = append(
p.Bad,
peer,
)
}
}

11
pkg/rpc/result/version.go Normal file
View file

@ -0,0 +1,11 @@
package result
type (
// Version model used for reporting server version
// info.
Version struct {
Port uint16 `json:"port"`
Nonce uint32 `json:"nonce"`
UserAgent string `json:"useragent"`
}
)

181
pkg/rpc/server.go Normal file
View file

@ -0,0 +1,181 @@
package rpc
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/network"
"github.com/CityOfZion/neo-go/pkg/rpc/result"
"github.com/CityOfZion/neo-go/pkg/util"
log "github.com/sirupsen/logrus"
)
type (
// Server represents the JSON-RPC 2.0 server.
Server struct {
*http.Server
chain core.Blockchainer
coreServer *network.Server
}
)
// NewServer creates a new Server struct.
func NewServer(chain core.Blockchainer, port uint16, coreServer *network.Server) Server {
return Server{
Server: &http.Server{
Addr: fmt.Sprintf(":%d", port),
},
chain: chain,
coreServer: coreServer,
}
}
// Start creates a new JSON-RPC server
// listening on the configured port.
func (s *Server) Start(errChan chan error) {
s.Handler = http.HandlerFunc(s.requestHandler)
log.WithFields(log.Fields{
"endpoint": s.Addr,
}).Info("starting rpc-server")
errChan <- s.ListenAndServe()
}
// Shutdown overrride the http.Server Shutdown
// method.
func (s *Server) Shutdown() error {
log.WithFields(log.Fields{
"endpoint": s.Addr,
}).Info("shutting down rpc-server")
return s.Server.Shutdown(context.Background())
}
func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request) {
req := NewRequest()
if httpRequest.Method != "POST" {
req.WriteErrorResponse(
w,
NewInvalidParamsError(
fmt.Sprintf("Invalid method '%s', please retry with 'POST'", httpRequest.Method), nil,
),
)
return
}
err := req.DecodeData(httpRequest.Body)
if err != nil {
req.WriteErrorResponse(w, NewParseError("Problem parsing JSON-RPC request body", err))
return
}
reqParams, err := req.Params()
if err != nil {
req.WriteErrorResponse(w, NewInvalidParamsError("Problem parsing request parameters", err))
return
}
s.methodHandler(w, req, *reqParams)
}
func (s *Server) methodHandler(w http.ResponseWriter, req *Request, reqParams Params) {
log.WithFields(log.Fields{
"method": req.Method,
"params": fmt.Sprintf("%v", reqParams),
}).Info("processing rpc request")
var results interface{}
var resultsErr *Error
switch req.Method {
case "getbestblockhash":
results = s.chain.CurrentBlockHash().String()
case "getblock":
var hash util.Uint256
var err error
param, exists := reqParams.ValueAt(0)
if !exists {
err = errors.New("Param at index at 0 doesn't exist")
resultsErr = NewInvalidParamsError(err.Error(), err)
break
}
switch param.Type {
case "string":
hash, err = util.Uint256DecodeString(param.StringVal)
if err != nil {
resultsErr = NewInvalidParamsError("Problem decoding block hash", err)
break
}
case "number":
hash = s.chain.GetHeaderHash(param.IntVal)
case "default":
err = errors.New("Expected param at index 0 to be either string or number")
resultsErr = NewInvalidParamsError(err.Error(), err)
break
}
results, err = s.chain.GetBlock(hash)
if err != nil {
resultsErr = NewInternalServerError(fmt.Sprintf("Problem locating block with hash: %s", hash), err)
break
}
case "getblockcount":
results = s.chain.BlockHeight()
case "getblockhash":
if param, exists := reqParams.ValueAtAndType(0, "number"); exists {
results = s.chain.GetHeaderHash(param.IntVal)
} else {
err := errors.New("Unable to parse parameter in position 0, expected a number")
resultsErr = NewInvalidParamsError(err.Error(), err)
break
}
case "getconnectioncount":
results = s.coreServer.PeerCount()
case "getversion":
results = result.Version{
Port: s.coreServer.ListenTCP,
Nonce: s.coreServer.ID(),
UserAgent: s.coreServer.UserAgent,
}
case "getpeers":
peers := result.NewPeers()
for _, addr := range s.coreServer.UnconnectedPeers() {
peers.AddPeer("unconnected", addr)
}
for _, addr := range s.coreServer.BadPeers() {
peers.AddPeer("bad", addr)
}
for addr := range s.coreServer.Peers() {
peers.AddPeer("connected", addr.Endpoint().String())
}
results = peers
case "validateaddress", "getblocksysfee", "getcontractstate", "getrawmempool", "getrawtransaction", "getstorage", "submitblock", "gettxout", "invoke", "invokefunction", "invokescript", "sendrawtransaction", "getaccountstate", "getassetstate":
results = "TODO"
default:
resultsErr = NewMethodNotFoundError(fmt.Sprintf("Method '%s' not supported", req.Method), nil)
}
if resultsErr != nil {
req.WriteErrorResponse(w, resultsErr)
}
if results != nil {
req.WriteResponse(w, results)
}
}

View file

@ -2,6 +2,7 @@ package util
import ( import (
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
) )
@ -56,3 +57,8 @@ func (u Uint160) Equals(other Uint160) bool {
} }
return true return true
} }
// MarshalJSON implements the json marshaller interface.
func (u Uint160) MarshalJSON() ([]byte, error) {
return json.Marshal(u.String())
}

View file

@ -2,6 +2,7 @@ package util
import ( import (
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
) )
@ -57,3 +58,8 @@ func (u Uint256) Equals(other Uint256) bool {
func (u Uint256) String() string { func (u Uint256) String() string {
return hex.EncodeToString(ArrayReverse(u.Bytes())) return hex.EncodeToString(ArrayReverse(u.Bytes()))
} }
// MarshalJSON implements the json marshaller interface.
func (u Uint256) MarshalJSON() ([]byte, error) {
return json.Marshal(u.String())
}