From 046494dd682f6ecf27c4e20f7436dc1f2672f124 Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Tue, 6 Feb 2018 07:43:32 +0100 Subject: [PATCH] Implemented processing headers + added leveldb as a dependency. (#16) * Implemented processing headers + added leveldb as a dependency. * version 0.7.0 * put glide get and install in build_cli section --- VERSION | 2 +- circle.yml | 4 + glide.lock | 23 +++++- glide.yaml | 5 +- pkg/core/block.go | 5 ++ pkg/core/block_test.go | 24 ++++++ pkg/core/blockchain.go | 141 ++++++++++++++++++++++----------- pkg/core/blockchain_storer.go | 69 ---------------- pkg/core/blockchain_test.go | 40 +++++++++- pkg/core/leveldb_store.go | 26 ++++++ pkg/core/memory_store.go | 18 +++++ pkg/core/store.go | 43 ++++++++++ pkg/network/payload/version.go | 5 +- pkg/network/peer.go | 8 ++ pkg/network/server.go | 42 ++++++---- pkg/network/tcp.go | 15 +++- 16 files changed, 327 insertions(+), 143 deletions(-) delete mode 100644 pkg/core/blockchain_storer.go create mode 100644 pkg/core/leveldb_store.go create mode 100644 pkg/core/memory_store.go create mode 100644 pkg/core/store.go diff --git a/VERSION b/VERSION index a918a2aa1..faef31a43 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.0 +0.7.0 diff --git a/circle.yml b/circle.yml index 933007dad..d67c8412d 100644 --- a/circle.yml +++ b/circle.yml @@ -6,6 +6,8 @@ jobs: - image: vidsyhq/go-builder:latest steps: - checkout + - run: go get github.com/Masterminds/glide + - run: glide install - restore_cache: key: dependency-cache-{{ .Revision }} - run: BUILD=false /scripts/build.sh @@ -44,6 +46,8 @@ jobs: - image: vidsyhq/go-builder:latest steps: - checkout + - run: go get github.com/Masterminds/glide + - run: glide install - run: make build workflows: diff --git a/glide.lock b/glide.lock index 59f70a101..5cc8c47c4 100644 --- a/glide.lock +++ b/glide.lock @@ -1,4 +1,21 @@ -hash: b1152abdd9a1fa1e70773cddcf54247d3fe3332602604f9f2233165ced02eeaf -updated: 2018-02-01T18:34:22.684905Z -imports: [] +hash: 2b8debf7936e789545da367433ddbf5f0e3bb54658340d6a412970bca25e6335 +updated: 2018-02-05T16:11:44.616743+01:00 +imports: +- name: github.com/golang/snappy + version: 553a641470496b2327abcac10b36396bd98e45c9 +- name: github.com/syndtr/goleveldb + version: 211f780988068502fe874c44dae530528ebd840f + subpackages: + - leveldb + - leveldb/cache + - leveldb/comparer + - leveldb/errors + - leveldb/filter + - leveldb/iterator + - leveldb/journal + - leveldb/memdb + - leveldb/opt + - leveldb/storage + - leveldb/table + - leveldb/util testImports: [] diff --git a/glide.yaml b/glide.yaml index 3b61baf8d..5e77a1723 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,2 +1,5 @@ package: github.com/CityOfZion/neo-go -import: [] +import: + - package: github.com/syndtr/goleveldb/leveldb + + diff --git a/pkg/core/block.go b/pkg/core/block.go index 053ef5881..a46115527 100644 --- a/pkg/core/block.go +++ b/pkg/core/block.go @@ -93,6 +93,11 @@ type Header struct { _ uint8 // padding } +// Verify the integrity of the header +func (h *Header) Verify() bool { + return true +} + // DecodeBinary impelements the Payload interface. func (h *Header) DecodeBinary(r io.Reader) error { if err := h.BlockBase.DecodeBinary(r); err != nil { diff --git a/pkg/core/block_test.go b/pkg/core/block_test.go index 4a25399fa..a6e0cca3d 100644 --- a/pkg/core/block_test.go +++ b/pkg/core/block_test.go @@ -9,6 +9,30 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" ) +func TestGenisis(t *testing.T) { + var ( + rawBlock = "000000000000000000000000000000000000000000000000000000000000000000000000845c34e7c1aed302b1718e914da0c42bf47c476ac4d89671f278d8ab6d27aa3d65fc8857000000001dac2b7c00000000be48d3a3f5d10013ab9ffee489706078714f1ea2010001510400001dac2b7c00000000400000455b7b226c616e67223a227a682d434e222c226e616d65223a22e5b08fe89a81e882a1227d2c7b226c616e67223a22656e222c226e616d65223a22416e745368617265227d5d0000c16ff28623000000da1745e9b549bd0bfa1a569971c77eba30cd5a4b00000000400001445b7b226c616e67223a227a682d434e222c226e616d65223a22e5b08fe89a81e5b881227d2c7b226c616e67223a22656e222c226e616d65223a22416e74436f696e227d5d0000c16ff286230008009f7fd096d37ed2c0e3f7f0cfc924beef4ffceb680000000001000000019b7cffdaa674beae0f930ebe6085af9093e5fe56b34a5c220ccdcf6efc336fc50000c16ff2862300be48d3a3f5d10013ab9ffee489706078714f1ea201000151" + //rawBlockHash = "996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099" + ) + + rawBlockBytes, err := hex.DecodeString(rawBlock) + if err != nil { + t.Fatal(err) + } + + block := &Block{} + if err := block.DecodeBinary(bytes.NewReader(rawBlockBytes)); err != nil { + t.Fatal(err) + } + + hash, err := block.Hash() + if err != nil { + t.Fatal(err) + } + + t.Log(hash) +} + func TestDecodeBlock(t *testing.T) { var ( rawBlock = "00000000b7def681f0080262aa293071c53b41fc3146b196067243700b68acd059734fd19543108bf9ddc738cbee2ed1160f153aa0d057f062de0aa3cbb64ba88735c23d43667e59543f050095df82b02e324c5ff3812db982f3b0089a21a278988efeec6a027b2501fd450140113ac66657c2f544e8ad13905fcb2ebaadfef9502cbefb07960fbe56df098814c223dcdd3d0efa0b43a9459e654d948516dcbd8b370f50fbecfb8b411d48051a408500ce85591e516525db24065411f6a88f43de90fa9c167c2e6f5af43bc84e65e5a4bb174bc83a19b6965ff10f476b1b151ae15439a985f33916abc6822b0bb140f4aae522ffaea229987a10d01beec826c3b9a189fe02aa82680581b78f3df0ea4d3f93ca8ea35ffc90f15f7db9017f92fafd9380d9ba3237973cf4313cf626fc40e30e50e3588bd047b39f478b59323868cd50c7ab54355d8245bf0f1988d37528f9bbfc68110cf917debbdbf1f4bdd02cdcccdc3269fdf18a6c727ee54b6934d840e43918dd1ec6123550ec37a513e72b34b2c2a3baa510dec3037cbef2fa9f6ed1e7ccd1f3f6e19d4ce2c0919af55249a970c2685217f75a5589cf9e54dff8449af155210209e7fd41dfb5c2f8dc72eb30358ac100ea8c72da18847befe06eade68cebfcb9210327da12b5c40200e9f65569476bbff2218da4f32548ff43b6387ec1416a231ee821034ff5ceeac41acf22cd5ed2da17a6df4dd8358fcb2bfb1a43208ad0feaab2746b21026ce35b29147ad09e4afe4ec4a7319095f08198fa8babbe3c56e970b143528d2221038dddc06ce687677a53d54f096d2591ba2302068cf123c1f2d75c2dddc542557921039dafd8571a641058ccc832c5e2111ea39b09c0bde36050914384f7a48bce9bf92102d02b1873a0863cd042cc717da31cea0d7cf9db32b74d4c72c01b0011503e2e2257ae01000095df82b000000000" diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 8dafb324e..5e5f7d282 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1,8 +1,10 @@ package core import ( - "fmt" + "bytes" + "encoding/binary" "log" + "sync" "time" "github.com/CityOfZion/neo-go/pkg/util" @@ -10,7 +12,8 @@ import ( // tuning parameters const ( - secondsPerBlock = 15 + secondsPerBlock = 15 + writeHdrBatchCnt = 2000 ) var ( @@ -19,23 +22,38 @@ var ( // Blockchain holds the chain. type Blockchain struct { - // Any object that satisfies the BlockchainStorer interface. - BlockchainStorer + logger *log.Logger - // index of the latest block. + // Any object that satisfies the BlockchainStorer interface. + Store + + // current index of the heighest block currentBlockHeight uint32 + // number of headers stored + storedHeaderCount uint32 + + mtx sync.RWMutex + // index of headers hashes headerIndex []util.Uint256 } // NewBlockchain returns a pointer to a Blockchain. -func NewBlockchain(store BlockchainStorer) *Blockchain { - hash, _ := util.Uint256DecodeFromString("0f654eb45164f08ddf296f7315d781f8b5a669c4d4b68f7265ffa79eeb455ed7") - return &Blockchain{ - BlockchainStorer: store, - headerIndex: []util.Uint256{hash}, +func NewBlockchain(s Store, l *log.Logger, startHash util.Uint256) *Blockchain { + bc := &Blockchain{ + logger: l, + Store: s, } + + // Starthash is 0, so we will create the genesis block. + if startHash.Equals(util.Uint256{}) { + bc.logger.Fatal("genesis block not yet implemented") + } + + bc.headerIndex = []util.Uint256{startHash} + + return bc } // genesisBlock creates the genesis block for the chain. @@ -64,7 +82,7 @@ func (bc *Blockchain) genesisBlock() *Block { } } -// AddBlock .. +// AddBlock (to be continued after headers is finished..) func (bc *Blockchain) AddBlock(block *Block) error { // TODO: caching headerLen := len(bc.headerIndex) @@ -90,63 +108,74 @@ func (bc *Blockchain) addHeader(header *Header) error { // AddHeaders processes the given headers. func (bc *Blockchain) AddHeaders(headers ...*Header) error { - var ( - count = 0 - newHeaders = []*Header{} - ) + start := time.Now() - fmt.Printf("received header, processing %d headers\n", len(headers)) + bc.mtx.Lock() + defer bc.mtx.Unlock() - for i := 0; i < len(headers); i++ { - h := headers[i] - if int(h.Index-1) >= len(bc.headerIndex)+count { - log.Printf("height of block higher then header index %d %d\n", + batch := Batch{} + for _, h := range headers { + if int(h.Index-1) >= len(bc.headerIndex) { + bc.logger.Printf("height of block higher then header index %d %d\n", h.Index, len(bc.headerIndex)) break } - - if int(h.Index) < count+len(bc.headerIndex) { + if int(h.Index) < len(bc.headerIndex) { continue } - - count++ - - newHeaders = append(newHeaders, h) + if !h.Verify() { + bc.logger.Printf("header %v is invalid", h) + break + } + if err := bc.processHeader(h, batch); err != nil { + return err + } } - log.Println("done processing the headers") + // TODO: Implement caching strategy. + if len(batch) > 0 { + // Write all batches. + if err := bc.writeBatch(batch); err != nil { + return err + } - if len(newHeaders) > 0 { - return bc.processHeaders(newHeaders) + bc.logger.Printf("done processing headers up to index %d took %f Seconds", + bc.HeaderHeight(), time.Since(start).Seconds()) } return nil - - // hash, err := header.Hash() - // if err != nil { - // return err - // } - - // bc.headerIndex = append(bc.headerIndex, hash) - - // return bc.Put(header) } -func (bc *Blockchain) processHeaders(headers []*Header) error { - lastHeader := headers[len(headers)-1:] +// processHeader processes 1 header. +func (bc *Blockchain) processHeader(h *Header, batch Batch) error { + hash, err := h.Hash() + if err != nil { + return err + } + bc.headerIndex = append(bc.headerIndex, hash) - for _, h := range headers { - hash, err := h.Hash() - if err != nil { - return err - } - bc.headerIndex = append(bc.headerIndex, hash) + for int(h.Index)-writeHdrBatchCnt >= int(bc.storedHeaderCount) { + // hdrsToWrite = bc.headerIndex[bc.storedHeaderCount : bc.storedHeaderCount+writeHdrBatchCnt] + + // NOTE: from original #c to be implemented: + // + // w.Write(header_index.Skip((int)stored_header_count).Take(2000).ToArray()); + // w.Flush(); + // batch.Put(SliceBuilder.Begin(DataEntryPrefix.IX_HeaderHashList).Add(stored_header_count), ms.ToArray()); + + bc.storedHeaderCount += writeHdrBatchCnt } - if lastHeader != nil { - fmt.Println(lastHeader) + buf := new(bytes.Buffer) + if err := h.EncodeBinary(buf); err != nil { + return err } + preBlock := preDataBlock.add(hash.ToSliceReverse()) + batch[&preBlock] = buf.Bytes() + preHeader := preSYSCurrentHeader.toSlice() + batch[&preHeader] = hashAndIndexToBytes(hash, h.Index) + return nil } @@ -161,3 +190,19 @@ func (bc *Blockchain) CurrentBlockHash() (hash util.Uint256) { return bc.headerIndex[bc.currentBlockHeight] } + +// BlockHeight return the height/index of the latest block this node has. +func (bc *Blockchain) BlockHeight() uint32 { + return bc.currentBlockHeight +} + +// HeaderHeight returns the current index of the headers. +func (bc *Blockchain) HeaderHeight() uint32 { + return uint32(len(bc.headerIndex)) - 1 +} + +func hashAndIndexToBytes(h util.Uint256, index uint32) []byte { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, index) + return append(h.ToSliceReverse(), buf...) +} diff --git a/pkg/core/blockchain_storer.go b/pkg/core/blockchain_storer.go deleted file mode 100644 index eaf2f9fad..000000000 --- a/pkg/core/blockchain_storer.go +++ /dev/null @@ -1,69 +0,0 @@ -package core - -import ( - "log" - "sync" - - "github.com/CityOfZion/neo-go/pkg/util" -) - -// BlockchainStorer is anything that can persist and retrieve the blockchain. -type BlockchainStorer interface { - HasBlock(util.Uint256) bool - GetBlockByHeight(uint32) (*Block, error) - GetBlockByHash(util.Uint256) (*Block, error) - Put(*Header) error -} - -// MemoryStore is an in memory implementation of a BlockChainStorer. -type MemoryStore struct { - mtx sync.RWMutex - blocks map[util.Uint256]*Header -} - -// NewMemoryStore returns a pointer to a MemoryStore object. -func NewMemoryStore() *MemoryStore { - return &MemoryStore{ - blocks: map[util.Uint256]*Header{}, - } -} - -// HasBlock implements the BlockchainStorer interface. -func (s *MemoryStore) HasBlock(hash util.Uint256) bool { - s.mtx.RLock() - defer s.mtx.RUnlock() - - _, ok := s.blocks[hash] - return ok -} - -// GetBlockByHash returns a block by its hash. -func (s *MemoryStore) GetBlockByHash(hash util.Uint256) (*Block, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - - return nil, nil -} - -// GetBlockByHeight returns a block by its height. -func (s *MemoryStore) GetBlockByHeight(i uint32) (*Block, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - - return nil, nil -} - -// Put persist a BlockHead in memory -func (s *MemoryStore) Put(header *Header) error { - s.mtx.Lock() - defer s.mtx.Unlock() - - hash, err := header.Hash() - if err != nil { - s.blocks[hash] = header - } - - log.Printf("persisted block %s\n", hash) - - return err -} diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index b5b100ade..a1b25e661 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1,9 +1,47 @@ package core import ( + "log" + "os" "testing" + + "github.com/CityOfZion/neo-go/pkg/util" ) -func TestGenesisBlock(t *testing.T) { +func TestNewBlockchain(t *testing.T) { + startHash, _ := util.Uint256DecodeFromString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099") + bc := NewBlockchain(nil, nil, startHash) + want := uint32(0) + if have := bc.BlockHeight(); want != have { + t.Fatalf("expected %d got %d", want, have) + } + if have := bc.HeaderHeight(); want != have { + t.Fatalf("expected %d got %d", want, have) + } + if have := bc.storedHeaderCount; want != have { + t.Fatalf("expected %d got %d", want, have) + } + if !bc.CurrentBlockHash().Equals(startHash) { + t.Fatalf("expected current block hash to be %d got %s", startHash, bc.CurrentBlockHash()) + } +} + +func TestAddHeaders(t *testing.T) { + startHash, _ := util.Uint256DecodeFromString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099") + bc := NewBlockchain(NewMemoryStore(), log.New(os.Stdout, "", 0), startHash) + + h1 := &Header{BlockBase: BlockBase{Version: 0, Index: 1}} + h2 := &Header{BlockBase: BlockBase{Version: 0, Index: 2}} + h3 := &Header{BlockBase: BlockBase{Version: 0, Index: 3}} + + if err := bc.AddHeaders(h1, h2, h3); err != nil { + t.Fatal(err) + } + if want, have := h3.Index, bc.HeaderHeight(); want != have { + t.Fatalf("expected header height of %d got %d", want, have) + } + if want, have := uint32(0), bc.storedHeaderCount; want != have { + t.Fatalf("expected stored header count to be %d got %d", want, have) + } } diff --git a/pkg/core/leveldb_store.go b/pkg/core/leveldb_store.go new file mode 100644 index 000000000..037bc3525 --- /dev/null +++ b/pkg/core/leveldb_store.go @@ -0,0 +1,26 @@ +package core + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +// LevelDBStore is the official storage implementation for storing and retreiving +// the blockchain. +type LevelDBStore struct { + db *leveldb.DB +} + +// Write implements the Store interface. +func (s *LevelDBStore) write(key, value []byte) error { + return s.db.Put(key, value, nil) +} + +// WriteBatch implements the Store interface. +func (s *LevelDBStore) writeBatch(batch Batch) error { + b := new(leveldb.Batch) + for k, v := range batch { + b.Put(*k, v) + } + + return s.db.Write(b, nil) +} diff --git a/pkg/core/memory_store.go b/pkg/core/memory_store.go new file mode 100644 index 000000000..ef9e4c126 --- /dev/null +++ b/pkg/core/memory_store.go @@ -0,0 +1,18 @@ +package core + +// MemoryStore is an in memory implementation of a BlockChainStorer. +type MemoryStore struct { +} + +// NewMemoryStore returns a pointer to a MemoryStore object. +func NewMemoryStore() *MemoryStore { + return &MemoryStore{} +} + +func (m *MemoryStore) write(key, value []byte) error { + return nil +} + +func (m *MemoryStore) writeBatch(batch Batch) error { + return nil +} diff --git a/pkg/core/store.go b/pkg/core/store.go new file mode 100644 index 000000000..0e18c75f3 --- /dev/null +++ b/pkg/core/store.go @@ -0,0 +1,43 @@ +package core + +type dataEntry uint8 + +func (e dataEntry) add(b []byte) []byte { + dest := make([]byte, len(b)+1) + dest[0] = byte(e) + for i := 1; i < len(b); i++ { + dest[i] = b[i] + } + return dest +} + +func (e dataEntry) toSlice() []byte { + return []byte{byte(e)} +} + +// Storage data entry prefixes. +const ( + preDataBlock dataEntry = 0x01 + preDataTransaction dataEntry = 0x02 + preSTAccount dataEntry = 0x40 + preSTCoin dataEntry = 0x44 + preSTValidator dataEntry = 0x48 + preSTAsset dataEntry = 0x4c + preSTContract dataEntry = 0x50 + preSTStorage dataEntry = 0x70 + preIXHeaderHashList dataEntry = 0x80 + preIXValidatorsCount dataEntry = 0x90 + preSYSCurrentBlock dataEntry = 0xc0 + preSYSCurrentHeader dataEntry = 0xc1 + preSYSVersion dataEntry = 0xf0 +) + +// Store is anything that can persist and retrieve the blockchain. +type Store interface { + write(k, v []byte) error + writeBatch(Batch) error +} + +// Batch is a data type used to store data for later batch operations +// by any Store. +type Batch map[*[]byte][]byte diff --git a/pkg/network/payload/version.go b/pkg/network/payload/version.go index 7656f3e92..5f8d4d0c8 100644 --- a/pkg/network/payload/version.go +++ b/pkg/network/payload/version.go @@ -3,6 +3,7 @@ package payload import ( "encoding/binary" "io" + "time" ) const minVersionSize = 27 @@ -32,11 +33,11 @@ func NewVersion(id uint32, p uint16, ua string, h uint32, r bool) *Version { return &Version{ Version: 0, Services: 1, - Timestamp: 12345, + Timestamp: uint32(time.Now().UTC().Unix()), Port: p, Nonce: id, UserAgent: []byte(ua), - StartHeight: 0, + StartHeight: h, Relay: r, } } diff --git a/pkg/network/peer.go b/pkg/network/peer.go index b910960e8..228f1638f 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,6 +1,7 @@ package network import ( + "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" ) @@ -10,6 +11,7 @@ type Peer interface { id() uint32 addr() util.Endpoint disconnect() + version() *payload.Version callVersion(*Message) error callGetaddr(*Message) error callVerack(*Message) error @@ -24,6 +26,7 @@ type LocalPeer struct { s *Server nonce uint32 endpoint util.Endpoint + pVersion *payload.Version } // NewLocalPeer return a LocalPeer. @@ -32,6 +35,11 @@ func NewLocalPeer(s *Server) *LocalPeer { return &LocalPeer{endpoint: e, s: s} } +// Version implements the Peer interface. +func (p *LocalPeer) version() *payload.Version { + return p.pVersion +} + func (p *LocalPeer) callVersion(msg *Message) error { return p.s.handleVersionCmd(msg, p) } diff --git a/pkg/network/server.go b/pkg/network/server.go index 42d1427d3..8fef91487 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -109,6 +109,9 @@ func NewServer(net NetMode) *Server { logger.Fatalf("invalid network mode %d", net) } + // For now I will hard code a genesis block of the docker privnet container. + startHash, _ := util.Uint256DecodeFromString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099") + s := &Server{ id: util.RandUint32(1111111, 9999999), userAgent: fmt.Sprintf("/NEO:%s/", version), @@ -121,8 +124,7 @@ func NewServer(net NetMode) *Server { net: net, quit: make(chan struct{}), peerCountCh: make(chan peerCount), - // knownHashes: protectedHashmap{}, - bc: core.NewBlockchain(core.NewMemoryStore()), + bc: core.NewBlockchain(core.NewMemoryStore(), logger, startHash), } return s @@ -205,7 +207,7 @@ func (s *Server) loop() { // the versions of eachother. func (s *Server) handlePeerConnected(p Peer) error { // TODO: get the blockheight of this server once core implemented this. - payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay) + payload := payload.NewVersion(s.id, s.port, s.userAgent, s.bc.HeaderHeight(), s.relay) msg := newMessage(s.net, cmdVersion, payload) return p.callVersion(msg) } @@ -255,11 +257,7 @@ func (s *Server) handleBlockCmd(msg *Message, p Peer) error { s.logger.Printf("new block: index %d hash %s", block.Index, hash) - if s.bc.HasBlock(hash) { - return nil - } - - return s.bc.AddBlock(block) + return nil } // After receiving the getaddr message, the node returns an addr message as response @@ -275,12 +273,25 @@ func (s *Server) handleAddrCmd(msg *Message, p Peer) error { return nil } -func (s *Server) handleHeadersCmd(msg *Message, p Peer) error { - headers := msg.Payload.(*payload.Headers) - +// Handle the headers received from the remote after we asked for headers with the +// "getheaders" message. +func (s *Server) handleHeadersCmd(headers *payload.Headers, p Peer) error { // Set a deadline for adding headers? go func(ctx context.Context, headers []*core.Header) { - s.bc.AddHeaders(headers...) + if err := s.bc.AddHeaders(headers...); err != nil { + s.logger.Printf("failed to add headers: %s", err) + return + } + + // Ask more headers if we are not in sync with the peer. + if s.bc.HeaderHeight() < p.version().StartHeight { + s.logger.Printf("header height %d peer height %d", s.bc.HeaderHeight(), p.version().StartHeight) + if err := s.askMoreHeaders(p); err != nil { + s.logger.Printf("getheaders RPC failed: %s", err) + return + } + } + }(context.TODO(), headers.Hdrs) return nil @@ -306,10 +317,11 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool { return false } +// TODO: Quit this routine if the peer is disconnected. func (s *Server) startProtocol(p Peer) { - // TODO: check if this peer is still connected. - // dont keep asking (maxPeers and no new nodes) - s.askMoreHeaders(p) + if s.bc.HeaderHeight() < p.version().StartHeight { + s.askMoreHeaders(p) + } for { getaddrMsg := newMessage(s.net, cmdGetAddr, nil) p.callGetaddr(getaddrMsg) diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index c506f1396..2fa23fec9 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -72,7 +72,7 @@ func handleConnection(s *Server, conn net.Conn) { } } -// handleMessage hands the message received from a TCP connection over to the server. +// handleMessage multiplexes the message received from a TCP connection to a server command. func handleMessage(s *Server, p *TCPPeer) { var err error @@ -87,7 +87,9 @@ func handleMessage(s *Server, p *TCPPeer) { if err = s.handleVersionCmd(msg, p); err != nil { break } - p.nonce = msg.Payload.(*payload.Version).Nonce + version := msg.Payload.(*payload.Version) + p.nonce = version.Nonce + p.pVersion = version // When a node receives a connection request, it declares its version immediately. // There will be no other communication until both sides are getting versions of each other. @@ -121,7 +123,8 @@ func handleMessage(s *Server, p *TCPPeer) { case cmdGetBlocks: case cmdGetData: case cmdHeaders: - err = s.handleHeadersCmd(msg, p) + headers := msg.Payload.(*payload.Headers) + err = s.handleHeadersCmd(headers, p) default: // This command is unknown by the server. err = fmt.Errorf("unknown command received %v", msg.Command) @@ -157,6 +160,8 @@ type TCPPeer struct { send chan sendTuple // channel to receive from underlying connection. receive chan *Message + // the version sended out by the peer when connected. + pVersion *payload.Version } // NewTCPPeer returns a pointer to a TCP Peer. @@ -183,6 +188,10 @@ func (p *TCPPeer) callVersion(msg *Message) error { return <-t.err } +func (p *TCPPeer) version() *payload.Version { + return p.pVersion +} + // id implements the peer interface func (p *TCPPeer) id() uint32 { return p.nonce