From 1a6bdd40993ab19e735f9fc00772baf246d1ade9 Mon Sep 17 00:00:00 2001 From: decentralisedkev <37423678+decentralisedkev@users.noreply.github.com> Date: Thu, 28 Mar 2019 22:49:34 +0000 Subject: [PATCH] [Server] Implements Orchestration server (#252) * [pubsub] - remove pubsub package * [chain] - Add height to chain * [peer] - remove unnecesary println * [server] - Implement server package * Add main.go to run node --- main.go | 20 +++++++ pkg/chain/chain.go | 9 +++- pkg/peer/peer.go | 1 - pkg/pubsub/event.go | 20 ------- pkg/pubsub/pub.go | 21 -------- pkg/pubsub/sub.go | 7 --- pkg/server/addrmgr.go | 7 +++ pkg/server/chain.go | 15 ++++++ pkg/server/connmgr.go | 62 +++++++++++++++++++++ pkg/server/database.go | 14 +++++ pkg/server/peerconfig.go | 23 ++++++++ pkg/server/peermgr.go | 9 ++++ pkg/server/server.go | 113 +++++++++++++++++++++++++++++++++++++++ pkg/server/syncmgr.go | 94 ++++++++++++++++++++++++++++++++ pkg/syncmgr/config.go | 2 +- 15 files changed, 366 insertions(+), 51 deletions(-) create mode 100644 main.go delete mode 100644 pkg/pubsub/event.go delete mode 100644 pkg/pubsub/pub.go delete mode 100644 pkg/pubsub/sub.go create mode 100644 pkg/server/addrmgr.go create mode 100644 pkg/server/chain.go create mode 100644 pkg/server/connmgr.go create mode 100644 pkg/server/database.go create mode 100644 pkg/server/peerconfig.go create mode 100644 pkg/server/peermgr.go create mode 100644 pkg/server/server.go create mode 100644 pkg/server/syncmgr.go diff --git a/main.go b/main.go new file mode 100644 index 000000000..b7b54ab7b --- /dev/null +++ b/main.go @@ -0,0 +1,20 @@ +package main + +import ( + "fmt" + + "github.com/CityOfZion/neo-go/pkg/server" + "github.com/CityOfZion/neo-go/pkg/wire/protocol" +) + +func main() { + s, err := server.New(protocol.MainNet, 10332) + if err != nil { + fmt.Println(err) + return + } + err = s.Run() + if err != nil { + fmt.Println("Server has stopped from the following error: ", err.Error()) + } +} diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go index c44764388..f49aedbac 100644 --- a/pkg/chain/chain.go +++ b/pkg/chain/chain.go @@ -23,7 +23,8 @@ var ( // Chain represents a blockchain instance type Chain struct { - Db *Chaindb + Db *Chaindb + height uint32 } // New returns a new chain instance @@ -128,3 +129,9 @@ func (c *Chain) ProcessHeaders(hdrs []*payload.BlockBase) error { func (c *Chain) verifyHeaders(hdrs []*payload.BlockBase) error { return nil } + +// CurrentHeight returns the index of the block +// at the tip of the chain +func (c Chain) CurrentHeight() uint32 { + return c.height +} diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index d1594d6de..8fe2f28d6 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -176,7 +176,6 @@ func (p *Peer) IsVerackReceived() bool { //NotifyDisconnect returns once the peer has disconnected // Blocking func (p *Peer) NotifyDisconnect() { - fmt.Println("Peer has not disconnected yet") <-p.quitch fmt.Println("Peer has just disconnected") } diff --git a/pkg/pubsub/event.go b/pkg/pubsub/event.go deleted file mode 100644 index 1a57e86cd..000000000 --- a/pkg/pubsub/event.go +++ /dev/null @@ -1,20 +0,0 @@ -package pubsub - -// EventType is an enum -// representing the types of messages we can subscribe to -type EventType int - -const ( - // NewBlock is called When blockchain connects a new block, it will emit an NewBlock Event - NewBlock EventType = iota - // BadBlock is called When blockchain declines a block, it will emit a new block event - BadBlock - // BadHeader is called When blockchain rejects a Header, it will emit this event - BadHeader -) - -// Event represents a new Event that a subscriber can listen to -type Event struct { - Type EventType // E.g. event.NewBlock - data []byte // Raw information -} diff --git a/pkg/pubsub/pub.go b/pkg/pubsub/pub.go deleted file mode 100644 index a9f6084a1..000000000 --- a/pkg/pubsub/pub.go +++ /dev/null @@ -1,21 +0,0 @@ -package pubsub - -// Publisher sends events to subscribers -type Publisher struct { - subs []Subscriber -} - -// Send iterates over each subscriber and checks -// if they are interested in the Event -// By looking at their topics, if they are then -// the event is emitted to them -func (p *Publisher) Send(e Event) error { - for _, sub := range p.subs { - for _, topic := range sub.Topics() { - if e.Type == topic { - sub.Emit(e) - } - } - } - return nil -} diff --git a/pkg/pubsub/sub.go b/pkg/pubsub/sub.go deleted file mode 100644 index 4540870ff..000000000 --- a/pkg/pubsub/sub.go +++ /dev/null @@ -1,7 +0,0 @@ -package pubsub - -// Subscriber will listen for Events from publishers -type Subscriber interface { - Topics() []EventType - Emit(Event) -} diff --git a/pkg/server/addrmgr.go b/pkg/server/addrmgr.go new file mode 100644 index 000000000..6237d79e3 --- /dev/null +++ b/pkg/server/addrmgr.go @@ -0,0 +1,7 @@ +package server + +// etAddress will return a viable address to connect to +// Currently it is hardcoded to be one neo node until address manager is implemented +func (s *Server) getAddress() (string, error) { + return "seed1.ngd.network:10333", nil +} diff --git a/pkg/server/chain.go b/pkg/server/chain.go new file mode 100644 index 000000000..23b6c2cdc --- /dev/null +++ b/pkg/server/chain.go @@ -0,0 +1,15 @@ +package server + +import ( + "github.com/CityOfZion/neo-go/pkg/chain" + "github.com/CityOfZion/neo-go/pkg/database" + "github.com/CityOfZion/neo-go/pkg/wire/protocol" +) + +func setupChain(db database.Database, net protocol.Magic) (*chain.Chain, error) { + chain, err := chain.New(db, net) + if err != nil { + return nil, err + } + return chain, nil +} diff --git a/pkg/server/connmgr.go b/pkg/server/connmgr.go new file mode 100644 index 000000000..64b16bdaf --- /dev/null +++ b/pkg/server/connmgr.go @@ -0,0 +1,62 @@ +package server + +import ( + "encoding/hex" + "fmt" + "net" + "strconv" + + "github.com/CityOfZion/neo-go/pkg/connmgr" + + "github.com/CityOfZion/neo-go/pkg/peer" + "github.com/CityOfZion/neo-go/pkg/wire/util" + iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip" +) + +func setupConnManager(s *Server, port uint16) *connmgr.Connmgr { + cfg := connmgr.Config{ + GetAddress: s.getAddress, + OnAccept: s.onAccept, + OnConnection: s.onConnection, + AddressPort: iputils.GetLocalIP().String() + ":" + strconv.FormatUint(uint64(port), 10), + } + return connmgr.New(cfg) +} + +func (s *Server) onConnection(conn net.Conn, addr string) { + fmt.Println("We have connected successfully to: ", addr) + + p := peer.NewPeer(conn, false, *s.peerCfg) + err := p.Run() + if err != nil { + fmt.Println("Error running peer" + err.Error()) + return + } + + s.pmg.AddPeer(p) + + byt, err := hex.DecodeString("d42561e3d30e15be6400b6df2f328e02d2bf6354c41dce433bc57687c82144bf") + if err != nil { + fmt.Println("Error getting hash " + err.Error()) + } + lh, err := util.Uint256DecodeBytes(byt) + if err != nil { + fmt.Println("Error getting hash " + err.Error()) + } + err = p.RequestHeaders(lh.Reverse()) + if err != nil { + fmt.Println(err) + } +} + +func (s *Server) onAccept(conn net.Conn) { + fmt.Println("A peer with address: ", conn.RemoteAddr().String(), "has connect to us") + + p := peer.NewPeer(conn, true, *s.peerCfg) + err := p.Run() + if err != nil { + fmt.Println("Error running peer" + err.Error()) + return + } + s.pmg.AddPeer(p) +} diff --git a/pkg/server/database.go b/pkg/server/database.go new file mode 100644 index 000000000..9b7eea9f0 --- /dev/null +++ b/pkg/server/database.go @@ -0,0 +1,14 @@ +package server + +import ( + "github.com/CityOfZion/neo-go/pkg/database" + "github.com/CityOfZion/neo-go/pkg/wire/protocol" +) + +func setupDatabase(net protocol.Magic) (database.Database, error) { + db, err := database.New(net.String()) + if err != nil { + return nil, err + } + return db, nil +} diff --git a/pkg/server/peerconfig.go b/pkg/server/peerconfig.go new file mode 100644 index 000000000..4fa8307bf --- /dev/null +++ b/pkg/server/peerconfig.go @@ -0,0 +1,23 @@ +package server + +import ( + "math/rand" + + "github.com/CityOfZion/neo-go/pkg/peer" + "github.com/CityOfZion/neo-go/pkg/wire/protocol" +) + +func setupPeerConfig(s *Server, port uint16, net protocol.Magic) *peer.LocalConfig { + return &peer.LocalConfig{ + Net: net, + UserAgent: "NEO-GO", + Services: protocol.NodePeerService, + Nonce: rand.Uint32(), + ProtocolVer: 0, + Relay: false, + Port: port, + StartHeight: s.chain.CurrentHeight, + OnHeader: s.onHeader, + OnBlock: s.onBlock, + } +} diff --git a/pkg/server/peermgr.go b/pkg/server/peermgr.go new file mode 100644 index 000000000..670be60fc --- /dev/null +++ b/pkg/server/peermgr.go @@ -0,0 +1,9 @@ +package server + +import ( + "github.com/CityOfZion/neo-go/pkg/peermgr" +) + +func setupPeerManager() *peermgr.PeerMgr { + return peermgr.New() +} diff --git a/pkg/server/server.go b/pkg/server/server.go new file mode 100644 index 000000000..b1e4c0f5b --- /dev/null +++ b/pkg/server/server.go @@ -0,0 +1,113 @@ +package server + +import ( + "fmt" + + "github.com/CityOfZion/neo-go/pkg/peermgr" + + "github.com/CityOfZion/neo-go/pkg/chain" + "github.com/CityOfZion/neo-go/pkg/connmgr" + "github.com/CityOfZion/neo-go/pkg/peer" + "github.com/CityOfZion/neo-go/pkg/syncmgr" + + "github.com/CityOfZion/neo-go/pkg/database" + + "github.com/CityOfZion/neo-go/pkg/wire/protocol" +) + +// Server orchestrates all of the modules +type Server struct { + net protocol.Magic + stopCh chan error + + // Modules + db database.Database + smg *syncmgr.Syncmgr + cmg *connmgr.Connmgr + pmg *peermgr.PeerMgr + chain *chain.Chain + + peerCfg *peer.LocalConfig +} + +//New creates a new server object for a particular network and sets up each module +func New(net protocol.Magic, port uint16) (*Server, error) { + s := &Server{ + net: net, + stopCh: make(chan error, 0), + } + + // Setup database + db, err := setupDatabase(net) + if err != nil { + return nil, err + } + s.db = db + + // setup peermgr + peermgr := setupPeerManager() + s.pmg = peermgr + + // Setup chain + chain, err := setupChain(db, net) + if err != nil { + return nil, err + } + s.chain = chain + + // Setup sync manager + syncmgr := setupSyncManager(s) + s.smg = syncmgr + + // Setup connection manager + connmgr := setupConnManager(s, port) + s.cmg = connmgr + + // Setup peer config + peerCfg := setupPeerConfig(s, port, net) + s.peerCfg = peerCfg + + return s, nil +} + +// Run starts the daemon by connecting to previously nodes or connectng to seed nodes. +// This should be called once all modules have been setup +func (s *Server) Run() error { + fmt.Println("Server is starting up") + + // start the connmgr + err := s.cmg.Run() + if err != nil { + return err + } + + // Attempt to connect to a peer + err = s.cmg.NewRequest() + if err != nil { + return err + } + + // Request header to start synchronisation + bestHeader, err := s.chain.Db.GetLastHeader() + if err != nil { + return err + } + err = s.pmg.RequestHeaders(bestHeader.Hash.Reverse()) + if err != nil { + return err + } + fmt.Println("Server Successfully started") + return s.wait() +} + +func (s *Server) wait() error { + err := <-s.stopCh + return err +} + +// Stop stops the server +func (s *Server) Stop(err error) error { + fmt.Println("Server is shutting down") + s.stopCh <- err + return nil +} diff --git a/pkg/server/syncmgr.go b/pkg/server/syncmgr.go new file mode 100644 index 000000000..3df456626 --- /dev/null +++ b/pkg/server/syncmgr.go @@ -0,0 +1,94 @@ +package server + +import ( + "encoding/binary" + + "github.com/CityOfZion/neo-go/pkg/peer" + "github.com/CityOfZion/neo-go/pkg/syncmgr" + "github.com/CityOfZion/neo-go/pkg/wire/payload" + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +func setupSyncManager(s *Server) *syncmgr.Syncmgr { + + cfg := &syncmgr.Config{ + ProcessBlock: s.processBlock, + ProcessHeaders: s.processHeaders, + + RequestBlock: s.requestBlock, + RequestHeaders: s.requestHeaders, + + GetNextBlockHash: s.getNextBlockHash, + AskForNewBlocks: s.askForNewBlocks, + + FetchHeadersAgain: s.fetchHeadersAgain, + FetchBlockAgain: s.fetchBlockAgain, + } + + return syncmgr.New(cfg) +} + +func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage) { + s.pmg.MsgReceived(peer, hdrsMessage.Command()) + s.smg.OnHeader(peer, hdrsMessage) +} + +func (s *Server) onBlock(peer *peer.Peer, blockMsg *payload.BlockMessage) { + s.pmg.MsgReceived(peer, blockMsg.Command()) + s.smg.OnBlock(peer, blockMsg) +} + +func (s *Server) processBlock(block payload.Block) error { + return s.chain.ProcessBlock(block) +} + +func (s *Server) processHeaders(hdrs []*payload.BlockBase) error { + return s.chain.ProcessHeaders(hdrs) +} + +func (s *Server) requestHeaders(hash util.Uint256) error { + return s.pmg.RequestHeaders(hash) +} + +func (s *Server) requestBlock(hash util.Uint256) error { + return s.pmg.RequestBlock(hash) +} + +// getNextBlockHash searches the database for the blockHash +// that is the height above our best block. The hash will be taken from a header. +func (s *Server) getNextBlockHash() (util.Uint256, error) { + bestBlock, err := s.chain.Db.GetLastBlock() + if err != nil { + // Panic! + // XXX: One alternative, is to get the network, erase the database and then start again from scratch. + // This should never happen. The latest block will always be atleast the genesis block + panic("could not get best block from database" + err.Error()) + } + + index := make([]byte, 4) + binary.BigEndian.PutUint32(index, bestBlock.Index+1) + + hdr, err := s.chain.Db.GetHeaderFromHeight(index) + if err != nil { + return util.Uint256{}, err + } + return hdr.Hash, nil +} + +func (s *Server) getBestBlockHash() (util.Uint256, error) { + return util.Uint256{}, nil +} + +func (s *Server) askForNewBlocks() { + // send a getblocks message with the latest block saved + + // when we receive something then send get data +} + +func (s *Server) fetchHeadersAgain(util.Uint256) error { + return nil +} + +func (s *Server) fetchBlockAgain(util.Uint256) error { + return nil +} diff --git a/pkg/syncmgr/config.go b/pkg/syncmgr/config.go index bc921b33b..af27f37c6 100644 --- a/pkg/syncmgr/config.go +++ b/pkg/syncmgr/config.go @@ -9,7 +9,7 @@ import ( type Config struct { // Chain functions - ProcessBlock func(msg payload.Block) error + ProcessBlock func(block payload.Block) error ProcessHeaders func(hdrs []*payload.BlockBase) error // RequestHeaders will send a getHeaders request