Implemented processing headers + added leveldb as a dependency. (#16)

* Implemented processing headers + added leveldb as a dependency.

* version 0.7.0

* put glide get and install in build_cli section
This commit is contained in:
Anthony De Meulemeester 2018-02-06 07:43:32 +01:00 committed by GitHub
parent d38e3290ed
commit 046494dd68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 327 additions and 143 deletions

View file

@ -1 +1 @@
0.6.0
0.7.0

View file

@ -6,6 +6,8 @@ jobs:
- image: vidsyhq/go-builder:latest
steps:
- checkout
- run: go get github.com/Masterminds/glide
- run: glide install
- restore_cache:
key: dependency-cache-{{ .Revision }}
- run: BUILD=false /scripts/build.sh
@ -44,6 +46,8 @@ jobs:
- image: vidsyhq/go-builder:latest
steps:
- checkout
- run: go get github.com/Masterminds/glide
- run: glide install
- run: make build
workflows:

23
glide.lock generated
View file

@ -1,4 +1,21 @@
hash: b1152abdd9a1fa1e70773cddcf54247d3fe3332602604f9f2233165ced02eeaf
updated: 2018-02-01T18:34:22.684905Z
imports: []
hash: 2b8debf7936e789545da367433ddbf5f0e3bb54658340d6a412970bca25e6335
updated: 2018-02-05T16:11:44.616743+01:00
imports:
- name: github.com/golang/snappy
version: 553a641470496b2327abcac10b36396bd98e45c9
- name: github.com/syndtr/goleveldb
version: 211f780988068502fe874c44dae530528ebd840f
subpackages:
- leveldb
- leveldb/cache
- leveldb/comparer
- leveldb/errors
- leveldb/filter
- leveldb/iterator
- leveldb/journal
- leveldb/memdb
- leveldb/opt
- leveldb/storage
- leveldb/table
- leveldb/util
testImports: []

View file

@ -1,2 +1,5 @@
package: github.com/CityOfZion/neo-go
import: []
import:
- package: github.com/syndtr/goleveldb/leveldb

View file

@ -93,6 +93,11 @@ type Header struct {
_ uint8 // padding
}
// Verify the integrity of the header
func (h *Header) Verify() bool {
return true
}
// DecodeBinary impelements the Payload interface.
func (h *Header) DecodeBinary(r io.Reader) error {
if err := h.BlockBase.DecodeBinary(r); err != nil {

View file

@ -9,6 +9,30 @@ import (
"github.com/CityOfZion/neo-go/pkg/util"
)
func TestGenisis(t *testing.T) {
var (
rawBlock = "000000000000000000000000000000000000000000000000000000000000000000000000845c34e7c1aed302b1718e914da0c42bf47c476ac4d89671f278d8ab6d27aa3d65fc8857000000001dac2b7c00000000be48d3a3f5d10013ab9ffee489706078714f1ea2010001510400001dac2b7c00000000400000455b7b226c616e67223a227a682d434e222c226e616d65223a22e5b08fe89a81e882a1227d2c7b226c616e67223a22656e222c226e616d65223a22416e745368617265227d5d0000c16ff28623000000da1745e9b549bd0bfa1a569971c77eba30cd5a4b00000000400001445b7b226c616e67223a227a682d434e222c226e616d65223a22e5b08fe89a81e5b881227d2c7b226c616e67223a22656e222c226e616d65223a22416e74436f696e227d5d0000c16ff286230008009f7fd096d37ed2c0e3f7f0cfc924beef4ffceb680000000001000000019b7cffdaa674beae0f930ebe6085af9093e5fe56b34a5c220ccdcf6efc336fc50000c16ff2862300be48d3a3f5d10013ab9ffee489706078714f1ea201000151"
//rawBlockHash = "996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099"
)
rawBlockBytes, err := hex.DecodeString(rawBlock)
if err != nil {
t.Fatal(err)
}
block := &Block{}
if err := block.DecodeBinary(bytes.NewReader(rawBlockBytes)); err != nil {
t.Fatal(err)
}
hash, err := block.Hash()
if err != nil {
t.Fatal(err)
}
t.Log(hash)
}
func TestDecodeBlock(t *testing.T) {
var (
rawBlock = "00000000b7def681f0080262aa293071c53b41fc3146b196067243700b68acd059734fd19543108bf9ddc738cbee2ed1160f153aa0d057f062de0aa3cbb64ba88735c23d43667e59543f050095df82b02e324c5ff3812db982f3b0089a21a278988efeec6a027b2501fd450140113ac66657c2f544e8ad13905fcb2ebaadfef9502cbefb07960fbe56df098814c223dcdd3d0efa0b43a9459e654d948516dcbd8b370f50fbecfb8b411d48051a408500ce85591e516525db24065411f6a88f43de90fa9c167c2e6f5af43bc84e65e5a4bb174bc83a19b6965ff10f476b1b151ae15439a985f33916abc6822b0bb140f4aae522ffaea229987a10d01beec826c3b9a189fe02aa82680581b78f3df0ea4d3f93ca8ea35ffc90f15f7db9017f92fafd9380d9ba3237973cf4313cf626fc40e30e50e3588bd047b39f478b59323868cd50c7ab54355d8245bf0f1988d37528f9bbfc68110cf917debbdbf1f4bdd02cdcccdc3269fdf18a6c727ee54b6934d840e43918dd1ec6123550ec37a513e72b34b2c2a3baa510dec3037cbef2fa9f6ed1e7ccd1f3f6e19d4ce2c0919af55249a970c2685217f75a5589cf9e54dff8449af155210209e7fd41dfb5c2f8dc72eb30358ac100ea8c72da18847befe06eade68cebfcb9210327da12b5c40200e9f65569476bbff2218da4f32548ff43b6387ec1416a231ee821034ff5ceeac41acf22cd5ed2da17a6df4dd8358fcb2bfb1a43208ad0feaab2746b21026ce35b29147ad09e4afe4ec4a7319095f08198fa8babbe3c56e970b143528d2221038dddc06ce687677a53d54f096d2591ba2302068cf123c1f2d75c2dddc542557921039dafd8571a641058ccc832c5e2111ea39b09c0bde36050914384f7a48bce9bf92102d02b1873a0863cd042cc717da31cea0d7cf9db32b74d4c72c01b0011503e2e2257ae01000095df82b000000000"

View file

@ -1,8 +1,10 @@
package core
import (
"fmt"
"bytes"
"encoding/binary"
"log"
"sync"
"time"
"github.com/CityOfZion/neo-go/pkg/util"
@ -10,7 +12,8 @@ import (
// tuning parameters
const (
secondsPerBlock = 15
secondsPerBlock = 15
writeHdrBatchCnt = 2000
)
var (
@ -19,23 +22,38 @@ var (
// Blockchain holds the chain.
type Blockchain struct {
// Any object that satisfies the BlockchainStorer interface.
BlockchainStorer
logger *log.Logger
// index of the latest block.
// Any object that satisfies the BlockchainStorer interface.
Store
// current index of the heighest block
currentBlockHeight uint32
// number of headers stored
storedHeaderCount uint32
mtx sync.RWMutex
// index of headers hashes
headerIndex []util.Uint256
}
// NewBlockchain returns a pointer to a Blockchain.
func NewBlockchain(store BlockchainStorer) *Blockchain {
hash, _ := util.Uint256DecodeFromString("0f654eb45164f08ddf296f7315d781f8b5a669c4d4b68f7265ffa79eeb455ed7")
return &Blockchain{
BlockchainStorer: store,
headerIndex: []util.Uint256{hash},
func NewBlockchain(s Store, l *log.Logger, startHash util.Uint256) *Blockchain {
bc := &Blockchain{
logger: l,
Store: s,
}
// Starthash is 0, so we will create the genesis block.
if startHash.Equals(util.Uint256{}) {
bc.logger.Fatal("genesis block not yet implemented")
}
bc.headerIndex = []util.Uint256{startHash}
return bc
}
// genesisBlock creates the genesis block for the chain.
@ -64,7 +82,7 @@ func (bc *Blockchain) genesisBlock() *Block {
}
}
// AddBlock ..
// AddBlock (to be continued after headers is finished..)
func (bc *Blockchain) AddBlock(block *Block) error {
// TODO: caching
headerLen := len(bc.headerIndex)
@ -90,63 +108,74 @@ func (bc *Blockchain) addHeader(header *Header) error {
// AddHeaders processes the given headers.
func (bc *Blockchain) AddHeaders(headers ...*Header) error {
var (
count = 0
newHeaders = []*Header{}
)
start := time.Now()
fmt.Printf("received header, processing %d headers\n", len(headers))
bc.mtx.Lock()
defer bc.mtx.Unlock()
for i := 0; i < len(headers); i++ {
h := headers[i]
if int(h.Index-1) >= len(bc.headerIndex)+count {
log.Printf("height of block higher then header index %d %d\n",
batch := Batch{}
for _, h := range headers {
if int(h.Index-1) >= len(bc.headerIndex) {
bc.logger.Printf("height of block higher then header index %d %d\n",
h.Index, len(bc.headerIndex))
break
}
if int(h.Index) < count+len(bc.headerIndex) {
if int(h.Index) < len(bc.headerIndex) {
continue
}
count++
newHeaders = append(newHeaders, h)
if !h.Verify() {
bc.logger.Printf("header %v is invalid", h)
break
}
if err := bc.processHeader(h, batch); err != nil {
return err
}
}
log.Println("done processing the headers")
// TODO: Implement caching strategy.
if len(batch) > 0 {
// Write all batches.
if err := bc.writeBatch(batch); err != nil {
return err
}
if len(newHeaders) > 0 {
return bc.processHeaders(newHeaders)
bc.logger.Printf("done processing headers up to index %d took %f Seconds",
bc.HeaderHeight(), time.Since(start).Seconds())
}
return nil
// hash, err := header.Hash()
// if err != nil {
// return err
// }
// bc.headerIndex = append(bc.headerIndex, hash)
// return bc.Put(header)
}
func (bc *Blockchain) processHeaders(headers []*Header) error {
lastHeader := headers[len(headers)-1:]
// processHeader processes 1 header.
func (bc *Blockchain) processHeader(h *Header, batch Batch) error {
hash, err := h.Hash()
if err != nil {
return err
}
bc.headerIndex = append(bc.headerIndex, hash)
for _, h := range headers {
hash, err := h.Hash()
if err != nil {
return err
}
bc.headerIndex = append(bc.headerIndex, hash)
for int(h.Index)-writeHdrBatchCnt >= int(bc.storedHeaderCount) {
// hdrsToWrite = bc.headerIndex[bc.storedHeaderCount : bc.storedHeaderCount+writeHdrBatchCnt]
// NOTE: from original #c to be implemented:
//
// w.Write(header_index.Skip((int)stored_header_count).Take(2000).ToArray());
// w.Flush();
// batch.Put(SliceBuilder.Begin(DataEntryPrefix.IX_HeaderHashList).Add(stored_header_count), ms.ToArray());
bc.storedHeaderCount += writeHdrBatchCnt
}
if lastHeader != nil {
fmt.Println(lastHeader)
buf := new(bytes.Buffer)
if err := h.EncodeBinary(buf); err != nil {
return err
}
preBlock := preDataBlock.add(hash.ToSliceReverse())
batch[&preBlock] = buf.Bytes()
preHeader := preSYSCurrentHeader.toSlice()
batch[&preHeader] = hashAndIndexToBytes(hash, h.Index)
return nil
}
@ -161,3 +190,19 @@ func (bc *Blockchain) CurrentBlockHash() (hash util.Uint256) {
return bc.headerIndex[bc.currentBlockHeight]
}
// BlockHeight return the height/index of the latest block this node has.
func (bc *Blockchain) BlockHeight() uint32 {
return bc.currentBlockHeight
}
// HeaderHeight returns the current index of the headers.
func (bc *Blockchain) HeaderHeight() uint32 {
return uint32(len(bc.headerIndex)) - 1
}
func hashAndIndexToBytes(h util.Uint256, index uint32) []byte {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, index)
return append(h.ToSliceReverse(), buf...)
}

View file

@ -1,69 +0,0 @@
package core
import (
"log"
"sync"
"github.com/CityOfZion/neo-go/pkg/util"
)
// BlockchainStorer is anything that can persist and retrieve the blockchain.
type BlockchainStorer interface {
HasBlock(util.Uint256) bool
GetBlockByHeight(uint32) (*Block, error)
GetBlockByHash(util.Uint256) (*Block, error)
Put(*Header) error
}
// MemoryStore is an in memory implementation of a BlockChainStorer.
type MemoryStore struct {
mtx sync.RWMutex
blocks map[util.Uint256]*Header
}
// NewMemoryStore returns a pointer to a MemoryStore object.
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
blocks: map[util.Uint256]*Header{},
}
}
// HasBlock implements the BlockchainStorer interface.
func (s *MemoryStore) HasBlock(hash util.Uint256) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
_, ok := s.blocks[hash]
return ok
}
// GetBlockByHash returns a block by its hash.
func (s *MemoryStore) GetBlockByHash(hash util.Uint256) (*Block, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
return nil, nil
}
// GetBlockByHeight returns a block by its height.
func (s *MemoryStore) GetBlockByHeight(i uint32) (*Block, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
return nil, nil
}
// Put persist a BlockHead in memory
func (s *MemoryStore) Put(header *Header) error {
s.mtx.Lock()
defer s.mtx.Unlock()
hash, err := header.Hash()
if err != nil {
s.blocks[hash] = header
}
log.Printf("persisted block %s\n", hash)
return err
}

View file

@ -1,9 +1,47 @@
package core
import (
"log"
"os"
"testing"
"github.com/CityOfZion/neo-go/pkg/util"
)
func TestGenesisBlock(t *testing.T) {
func TestNewBlockchain(t *testing.T) {
startHash, _ := util.Uint256DecodeFromString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
bc := NewBlockchain(nil, nil, startHash)
want := uint32(0)
if have := bc.BlockHeight(); want != have {
t.Fatalf("expected %d got %d", want, have)
}
if have := bc.HeaderHeight(); want != have {
t.Fatalf("expected %d got %d", want, have)
}
if have := bc.storedHeaderCount; want != have {
t.Fatalf("expected %d got %d", want, have)
}
if !bc.CurrentBlockHash().Equals(startHash) {
t.Fatalf("expected current block hash to be %d got %s", startHash, bc.CurrentBlockHash())
}
}
func TestAddHeaders(t *testing.T) {
startHash, _ := util.Uint256DecodeFromString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
bc := NewBlockchain(NewMemoryStore(), log.New(os.Stdout, "", 0), startHash)
h1 := &Header{BlockBase: BlockBase{Version: 0, Index: 1}}
h2 := &Header{BlockBase: BlockBase{Version: 0, Index: 2}}
h3 := &Header{BlockBase: BlockBase{Version: 0, Index: 3}}
if err := bc.AddHeaders(h1, h2, h3); err != nil {
t.Fatal(err)
}
if want, have := h3.Index, bc.HeaderHeight(); want != have {
t.Fatalf("expected header height of %d got %d", want, have)
}
if want, have := uint32(0), bc.storedHeaderCount; want != have {
t.Fatalf("expected stored header count to be %d got %d", want, have)
}
}

26
pkg/core/leveldb_store.go Normal file
View file

@ -0,0 +1,26 @@
package core
import (
"github.com/syndtr/goleveldb/leveldb"
)
// LevelDBStore is the official storage implementation for storing and retreiving
// the blockchain.
type LevelDBStore struct {
db *leveldb.DB
}
// Write implements the Store interface.
func (s *LevelDBStore) write(key, value []byte) error {
return s.db.Put(key, value, nil)
}
// WriteBatch implements the Store interface.
func (s *LevelDBStore) writeBatch(batch Batch) error {
b := new(leveldb.Batch)
for k, v := range batch {
b.Put(*k, v)
}
return s.db.Write(b, nil)
}

18
pkg/core/memory_store.go Normal file
View file

@ -0,0 +1,18 @@
package core
// MemoryStore is an in memory implementation of a BlockChainStorer.
type MemoryStore struct {
}
// NewMemoryStore returns a pointer to a MemoryStore object.
func NewMemoryStore() *MemoryStore {
return &MemoryStore{}
}
func (m *MemoryStore) write(key, value []byte) error {
return nil
}
func (m *MemoryStore) writeBatch(batch Batch) error {
return nil
}

43
pkg/core/store.go Normal file
View file

@ -0,0 +1,43 @@
package core
type dataEntry uint8
func (e dataEntry) add(b []byte) []byte {
dest := make([]byte, len(b)+1)
dest[0] = byte(e)
for i := 1; i < len(b); i++ {
dest[i] = b[i]
}
return dest
}
func (e dataEntry) toSlice() []byte {
return []byte{byte(e)}
}
// Storage data entry prefixes.
const (
preDataBlock dataEntry = 0x01
preDataTransaction dataEntry = 0x02
preSTAccount dataEntry = 0x40
preSTCoin dataEntry = 0x44
preSTValidator dataEntry = 0x48
preSTAsset dataEntry = 0x4c
preSTContract dataEntry = 0x50
preSTStorage dataEntry = 0x70
preIXHeaderHashList dataEntry = 0x80
preIXValidatorsCount dataEntry = 0x90
preSYSCurrentBlock dataEntry = 0xc0
preSYSCurrentHeader dataEntry = 0xc1
preSYSVersion dataEntry = 0xf0
)
// Store is anything that can persist and retrieve the blockchain.
type Store interface {
write(k, v []byte) error
writeBatch(Batch) error
}
// Batch is a data type used to store data for later batch operations
// by any Store.
type Batch map[*[]byte][]byte

View file

@ -3,6 +3,7 @@ package payload
import (
"encoding/binary"
"io"
"time"
)
const minVersionSize = 27
@ -32,11 +33,11 @@ func NewVersion(id uint32, p uint16, ua string, h uint32, r bool) *Version {
return &Version{
Version: 0,
Services: 1,
Timestamp: 12345,
Timestamp: uint32(time.Now().UTC().Unix()),
Port: p,
Nonce: id,
UserAgent: []byte(ua),
StartHeight: 0,
StartHeight: h,
Relay: r,
}
}

View file

@ -1,6 +1,7 @@
package network
import (
"github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util"
)
@ -10,6 +11,7 @@ type Peer interface {
id() uint32
addr() util.Endpoint
disconnect()
version() *payload.Version
callVersion(*Message) error
callGetaddr(*Message) error
callVerack(*Message) error
@ -24,6 +26,7 @@ type LocalPeer struct {
s *Server
nonce uint32
endpoint util.Endpoint
pVersion *payload.Version
}
// NewLocalPeer return a LocalPeer.
@ -32,6 +35,11 @@ func NewLocalPeer(s *Server) *LocalPeer {
return &LocalPeer{endpoint: e, s: s}
}
// Version implements the Peer interface.
func (p *LocalPeer) version() *payload.Version {
return p.pVersion
}
func (p *LocalPeer) callVersion(msg *Message) error {
return p.s.handleVersionCmd(msg, p)
}

View file

@ -109,6 +109,9 @@ func NewServer(net NetMode) *Server {
logger.Fatalf("invalid network mode %d", net)
}
// For now I will hard code a genesis block of the docker privnet container.
startHash, _ := util.Uint256DecodeFromString("996e37358dc369912041f966f8c5d8d3a8255ba5dcbd3447f8a82b55db869099")
s := &Server{
id: util.RandUint32(1111111, 9999999),
userAgent: fmt.Sprintf("/NEO:%s/", version),
@ -121,8 +124,7 @@ func NewServer(net NetMode) *Server {
net: net,
quit: make(chan struct{}),
peerCountCh: make(chan peerCount),
// knownHashes: protectedHashmap{},
bc: core.NewBlockchain(core.NewMemoryStore()),
bc: core.NewBlockchain(core.NewMemoryStore(), logger, startHash),
}
return s
@ -205,7 +207,7 @@ func (s *Server) loop() {
// the versions of eachother.
func (s *Server) handlePeerConnected(p Peer) error {
// TODO: get the blockheight of this server once core implemented this.
payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay)
payload := payload.NewVersion(s.id, s.port, s.userAgent, s.bc.HeaderHeight(), s.relay)
msg := newMessage(s.net, cmdVersion, payload)
return p.callVersion(msg)
}
@ -255,11 +257,7 @@ func (s *Server) handleBlockCmd(msg *Message, p Peer) error {
s.logger.Printf("new block: index %d hash %s", block.Index, hash)
if s.bc.HasBlock(hash) {
return nil
}
return s.bc.AddBlock(block)
return nil
}
// After receiving the getaddr message, the node returns an addr message as response
@ -275,12 +273,25 @@ func (s *Server) handleAddrCmd(msg *Message, p Peer) error {
return nil
}
func (s *Server) handleHeadersCmd(msg *Message, p Peer) error {
headers := msg.Payload.(*payload.Headers)
// Handle the headers received from the remote after we asked for headers with the
// "getheaders" message.
func (s *Server) handleHeadersCmd(headers *payload.Headers, p Peer) error {
// Set a deadline for adding headers?
go func(ctx context.Context, headers []*core.Header) {
s.bc.AddHeaders(headers...)
if err := s.bc.AddHeaders(headers...); err != nil {
s.logger.Printf("failed to add headers: %s", err)
return
}
// Ask more headers if we are not in sync with the peer.
if s.bc.HeaderHeight() < p.version().StartHeight {
s.logger.Printf("header height %d peer height %d", s.bc.HeaderHeight(), p.version().StartHeight)
if err := s.askMoreHeaders(p); err != nil {
s.logger.Printf("getheaders RPC failed: %s", err)
return
}
}
}(context.TODO(), headers.Hdrs)
return nil
@ -306,10 +317,11 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool {
return false
}
// TODO: Quit this routine if the peer is disconnected.
func (s *Server) startProtocol(p Peer) {
// TODO: check if this peer is still connected.
// dont keep asking (maxPeers and no new nodes)
s.askMoreHeaders(p)
if s.bc.HeaderHeight() < p.version().StartHeight {
s.askMoreHeaders(p)
}
for {
getaddrMsg := newMessage(s.net, cmdGetAddr, nil)
p.callGetaddr(getaddrMsg)

View file

@ -72,7 +72,7 @@ func handleConnection(s *Server, conn net.Conn) {
}
}
// handleMessage hands the message received from a TCP connection over to the server.
// handleMessage multiplexes the message received from a TCP connection to a server command.
func handleMessage(s *Server, p *TCPPeer) {
var err error
@ -87,7 +87,9 @@ func handleMessage(s *Server, p *TCPPeer) {
if err = s.handleVersionCmd(msg, p); err != nil {
break
}
p.nonce = msg.Payload.(*payload.Version).Nonce
version := msg.Payload.(*payload.Version)
p.nonce = version.Nonce
p.pVersion = version
// When a node receives a connection request, it declares its version immediately.
// There will be no other communication until both sides are getting versions of each other.
@ -121,7 +123,8 @@ func handleMessage(s *Server, p *TCPPeer) {
case cmdGetBlocks:
case cmdGetData:
case cmdHeaders:
err = s.handleHeadersCmd(msg, p)
headers := msg.Payload.(*payload.Headers)
err = s.handleHeadersCmd(headers, p)
default:
// This command is unknown by the server.
err = fmt.Errorf("unknown command received %v", msg.Command)
@ -157,6 +160,8 @@ type TCPPeer struct {
send chan sendTuple
// channel to receive from underlying connection.
receive chan *Message
// the version sended out by the peer when connected.
pVersion *payload.Version
}
// NewTCPPeer returns a pointer to a TCP Peer.
@ -183,6 +188,10 @@ func (p *TCPPeer) callVersion(msg *Message) error {
return <-t.err
}
func (p *TCPPeer) version() *payload.Version {
return p.pVersion
}
// id implements the peer interface
func (p *TCPPeer) id() uint32 {
return p.nonce