[Server] Implements Orchestration server (#252)

* [pubsub]

- remove pubsub package

* [chain]

- Add height to chain

* [peer]

- remove unnecesary println

* [server]

- Implement server package

* Add main.go to run node
This commit is contained in:
decentralisedkev 2019-03-28 22:49:34 +00:00 committed by GitHub
parent cb21c66316
commit 1a6bdd4099
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 366 additions and 51 deletions

20
main.go Normal file
View file

@ -0,0 +1,20 @@
package main
import (
"fmt"
"github.com/CityOfZion/neo-go/pkg/server"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
)
func main() {
s, err := server.New(protocol.MainNet, 10332)
if err != nil {
fmt.Println(err)
return
}
err = s.Run()
if err != nil {
fmt.Println("Server has stopped from the following error: ", err.Error())
}
}

View file

@ -24,6 +24,7 @@ var (
// Chain represents a blockchain instance
type Chain struct {
Db *Chaindb
height uint32
}
// New returns a new chain instance
@ -128,3 +129,9 @@ func (c *Chain) ProcessHeaders(hdrs []*payload.BlockBase) error {
func (c *Chain) verifyHeaders(hdrs []*payload.BlockBase) error {
return nil
}
// CurrentHeight returns the index of the block
// at the tip of the chain
func (c Chain) CurrentHeight() uint32 {
return c.height
}

View file

@ -176,7 +176,6 @@ func (p *Peer) IsVerackReceived() bool {
//NotifyDisconnect returns once the peer has disconnected
// Blocking
func (p *Peer) NotifyDisconnect() {
fmt.Println("Peer has not disconnected yet")
<-p.quitch
fmt.Println("Peer has just disconnected")
}

View file

@ -1,20 +0,0 @@
package pubsub
// EventType is an enum
// representing the types of messages we can subscribe to
type EventType int
const (
// NewBlock is called When blockchain connects a new block, it will emit an NewBlock Event
NewBlock EventType = iota
// BadBlock is called When blockchain declines a block, it will emit a new block event
BadBlock
// BadHeader is called When blockchain rejects a Header, it will emit this event
BadHeader
)
// Event represents a new Event that a subscriber can listen to
type Event struct {
Type EventType // E.g. event.NewBlock
data []byte // Raw information
}

View file

@ -1,21 +0,0 @@
package pubsub
// Publisher sends events to subscribers
type Publisher struct {
subs []Subscriber
}
// Send iterates over each subscriber and checks
// if they are interested in the Event
// By looking at their topics, if they are then
// the event is emitted to them
func (p *Publisher) Send(e Event) error {
for _, sub := range p.subs {
for _, topic := range sub.Topics() {
if e.Type == topic {
sub.Emit(e)
}
}
}
return nil
}

View file

@ -1,7 +0,0 @@
package pubsub
// Subscriber will listen for Events from publishers
type Subscriber interface {
Topics() []EventType
Emit(Event)
}

7
pkg/server/addrmgr.go Normal file
View file

@ -0,0 +1,7 @@
package server
// etAddress will return a viable address to connect to
// Currently it is hardcoded to be one neo node until address manager is implemented
func (s *Server) getAddress() (string, error) {
return "seed1.ngd.network:10333", nil
}

15
pkg/server/chain.go Normal file
View file

@ -0,0 +1,15 @@
package server
import (
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/database"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
)
func setupChain(db database.Database, net protocol.Magic) (*chain.Chain, error) {
chain, err := chain.New(db, net)
if err != nil {
return nil, err
}
return chain, nil
}

62
pkg/server/connmgr.go Normal file
View file

@ -0,0 +1,62 @@
package server
import (
"encoding/hex"
"fmt"
"net"
"strconv"
"github.com/CityOfZion/neo-go/pkg/connmgr"
"github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/wire/util"
iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip"
)
func setupConnManager(s *Server, port uint16) *connmgr.Connmgr {
cfg := connmgr.Config{
GetAddress: s.getAddress,
OnAccept: s.onAccept,
OnConnection: s.onConnection,
AddressPort: iputils.GetLocalIP().String() + ":" + strconv.FormatUint(uint64(port), 10),
}
return connmgr.New(cfg)
}
func (s *Server) onConnection(conn net.Conn, addr string) {
fmt.Println("We have connected successfully to: ", addr)
p := peer.NewPeer(conn, false, *s.peerCfg)
err := p.Run()
if err != nil {
fmt.Println("Error running peer" + err.Error())
return
}
s.pmg.AddPeer(p)
byt, err := hex.DecodeString("d42561e3d30e15be6400b6df2f328e02d2bf6354c41dce433bc57687c82144bf")
if err != nil {
fmt.Println("Error getting hash " + err.Error())
}
lh, err := util.Uint256DecodeBytes(byt)
if err != nil {
fmt.Println("Error getting hash " + err.Error())
}
err = p.RequestHeaders(lh.Reverse())
if err != nil {
fmt.Println(err)
}
}
func (s *Server) onAccept(conn net.Conn) {
fmt.Println("A peer with address: ", conn.RemoteAddr().String(), "has connect to us")
p := peer.NewPeer(conn, true, *s.peerCfg)
err := p.Run()
if err != nil {
fmt.Println("Error running peer" + err.Error())
return
}
s.pmg.AddPeer(p)
}

14
pkg/server/database.go Normal file
View file

@ -0,0 +1,14 @@
package server
import (
"github.com/CityOfZion/neo-go/pkg/database"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
)
func setupDatabase(net protocol.Magic) (database.Database, error) {
db, err := database.New(net.String())
if err != nil {
return nil, err
}
return db, nil
}

23
pkg/server/peerconfig.go Normal file
View file

@ -0,0 +1,23 @@
package server
import (
"math/rand"
"github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
)
func setupPeerConfig(s *Server, port uint16, net protocol.Magic) *peer.LocalConfig {
return &peer.LocalConfig{
Net: net,
UserAgent: "NEO-GO",
Services: protocol.NodePeerService,
Nonce: rand.Uint32(),
ProtocolVer: 0,
Relay: false,
Port: port,
StartHeight: s.chain.CurrentHeight,
OnHeader: s.onHeader,
OnBlock: s.onBlock,
}
}

9
pkg/server/peermgr.go Normal file
View file

@ -0,0 +1,9 @@
package server
import (
"github.com/CityOfZion/neo-go/pkg/peermgr"
)
func setupPeerManager() *peermgr.PeerMgr {
return peermgr.New()
}

113
pkg/server/server.go Normal file
View file

@ -0,0 +1,113 @@
package server
import (
"fmt"
"github.com/CityOfZion/neo-go/pkg/peermgr"
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/connmgr"
"github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/syncmgr"
"github.com/CityOfZion/neo-go/pkg/database"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
)
// Server orchestrates all of the modules
type Server struct {
net protocol.Magic
stopCh chan error
// Modules
db database.Database
smg *syncmgr.Syncmgr
cmg *connmgr.Connmgr
pmg *peermgr.PeerMgr
chain *chain.Chain
peerCfg *peer.LocalConfig
}
//New creates a new server object for a particular network and sets up each module
func New(net protocol.Magic, port uint16) (*Server, error) {
s := &Server{
net: net,
stopCh: make(chan error, 0),
}
// Setup database
db, err := setupDatabase(net)
if err != nil {
return nil, err
}
s.db = db
// setup peermgr
peermgr := setupPeerManager()
s.pmg = peermgr
// Setup chain
chain, err := setupChain(db, net)
if err != nil {
return nil, err
}
s.chain = chain
// Setup sync manager
syncmgr := setupSyncManager(s)
s.smg = syncmgr
// Setup connection manager
connmgr := setupConnManager(s, port)
s.cmg = connmgr
// Setup peer config
peerCfg := setupPeerConfig(s, port, net)
s.peerCfg = peerCfg
return s, nil
}
// Run starts the daemon by connecting to previously nodes or connectng to seed nodes.
// This should be called once all modules have been setup
func (s *Server) Run() error {
fmt.Println("Server is starting up")
// start the connmgr
err := s.cmg.Run()
if err != nil {
return err
}
// Attempt to connect to a peer
err = s.cmg.NewRequest()
if err != nil {
return err
}
// Request header to start synchronisation
bestHeader, err := s.chain.Db.GetLastHeader()
if err != nil {
return err
}
err = s.pmg.RequestHeaders(bestHeader.Hash.Reverse())
if err != nil {
return err
}
fmt.Println("Server Successfully started")
return s.wait()
}
func (s *Server) wait() error {
err := <-s.stopCh
return err
}
// Stop stops the server
func (s *Server) Stop(err error) error {
fmt.Println("Server is shutting down")
s.stopCh <- err
return nil
}

94
pkg/server/syncmgr.go Normal file
View file

@ -0,0 +1,94 @@
package server
import (
"encoding/binary"
"github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/syncmgr"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
func setupSyncManager(s *Server) *syncmgr.Syncmgr {
cfg := &syncmgr.Config{
ProcessBlock: s.processBlock,
ProcessHeaders: s.processHeaders,
RequestBlock: s.requestBlock,
RequestHeaders: s.requestHeaders,
GetNextBlockHash: s.getNextBlockHash,
AskForNewBlocks: s.askForNewBlocks,
FetchHeadersAgain: s.fetchHeadersAgain,
FetchBlockAgain: s.fetchBlockAgain,
}
return syncmgr.New(cfg)
}
func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage) {
s.pmg.MsgReceived(peer, hdrsMessage.Command())
s.smg.OnHeader(peer, hdrsMessage)
}
func (s *Server) onBlock(peer *peer.Peer, blockMsg *payload.BlockMessage) {
s.pmg.MsgReceived(peer, blockMsg.Command())
s.smg.OnBlock(peer, blockMsg)
}
func (s *Server) processBlock(block payload.Block) error {
return s.chain.ProcessBlock(block)
}
func (s *Server) processHeaders(hdrs []*payload.BlockBase) error {
return s.chain.ProcessHeaders(hdrs)
}
func (s *Server) requestHeaders(hash util.Uint256) error {
return s.pmg.RequestHeaders(hash)
}
func (s *Server) requestBlock(hash util.Uint256) error {
return s.pmg.RequestBlock(hash)
}
// getNextBlockHash searches the database for the blockHash
// that is the height above our best block. The hash will be taken from a header.
func (s *Server) getNextBlockHash() (util.Uint256, error) {
bestBlock, err := s.chain.Db.GetLastBlock()
if err != nil {
// Panic!
// XXX: One alternative, is to get the network, erase the database and then start again from scratch.
// This should never happen. The latest block will always be atleast the genesis block
panic("could not get best block from database" + err.Error())
}
index := make([]byte, 4)
binary.BigEndian.PutUint32(index, bestBlock.Index+1)
hdr, err := s.chain.Db.GetHeaderFromHeight(index)
if err != nil {
return util.Uint256{}, err
}
return hdr.Hash, nil
}
func (s *Server) getBestBlockHash() (util.Uint256, error) {
return util.Uint256{}, nil
}
func (s *Server) askForNewBlocks() {
// send a getblocks message with the latest block saved
// when we receive something then send get data
}
func (s *Server) fetchHeadersAgain(util.Uint256) error {
return nil
}
func (s *Server) fetchBlockAgain(util.Uint256) error {
return nil
}

View file

@ -9,7 +9,7 @@ import (
type Config struct {
// Chain functions
ProcessBlock func(msg payload.Block) error
ProcessBlock func(block payload.Block) error
ProcessHeaders func(hdrs []*payload.BlockBase) error
// RequestHeaders will send a getHeaders request