Merge pull request #351 from nspcc-dev/drop-redundant-dev-code-part-7

Drop redundant dev code part 7, the next one in series of #315, #318, #322, #328, #330, #338.
This commit is contained in:
Roman Khimov 2019-09-04 19:53:56 +03:00 committed by GitHub
commit 1ffda460bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 0 additions and 3313 deletions

View file

@ -1,137 +0,0 @@
package chain
import (
"fmt"
"github.com/pkg/errors"
"github.com/CityOfZion/neo-go/pkg/chaincfg"
"github.com/CityOfZion/neo-go/pkg/wire/payload/transaction"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
"github.com/CityOfZion/neo-go/pkg/database"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
var (
// ErrBlockAlreadyExists happens when you try to save the same block twice
ErrBlockAlreadyExists = errors.New("this block has already been saved in the database")
// ErrFutureBlock happens when you try to save a block that is not the next block sequentially
ErrFutureBlock = errors.New("this is not the next block sequentially, that should be added to the chain")
)
// Chain represents a blockchain instance
type Chain struct {
Db *Chaindb
height uint32
}
// New returns a new chain instance
func New(db database.Database, magic protocol.Magic) (*Chain, error) {
chain := &Chain{
Db: &Chaindb{db},
}
// Get last header saved to see if this is a fresh database
_, err := chain.Db.GetLastHeader()
if err == nil {
return chain, nil
}
if err != database.ErrNotFound {
return nil, err
}
// We have a database.ErrNotFound. Insert the genesisBlock
fmt.Printf("Starting a fresh database for %s\n", magic.String())
params, err := chaincfg.NetParams(magic)
if err != nil {
return nil, err
}
err = chain.Db.saveHeader(&params.GenesisBlock.BlockBase)
if err != nil {
return nil, err
}
err = chain.Db.saveBlock(params.GenesisBlock, true)
if err != nil {
return nil, err
}
return chain, nil
}
// ProcessBlock verifies and saves the block in the database
// XXX: for now we will just save without verifying the block
// This function is called by the server and if an error is returned then
// the server informs the sync manager to redownload the block
// XXX:We should also check if the header is already saved in the database
// If not, then we need to validate the header with the rest of the chain
// For now we re-save the header
func (c *Chain) ProcessBlock(block payload.Block) error {
// Check if we already have this block saved
// XXX: We can optimise by implementing a Has() method
// caching the last block in memory
lastBlock, err := c.Db.GetLastBlock()
if err != nil {
return err
}
if lastBlock.Index > block.Index {
return ErrBlockAlreadyExists
}
if block.Index > lastBlock.Index+1 {
return ErrFutureBlock
}
err = c.verifyBlock(block)
if err != nil {
return ValidationError{err.Error()}
}
err = c.Db.saveBlock(block, false)
if err != nil {
return DatabaseError{err.Error()}
}
return nil
}
// VerifyBlock verifies whether a block is valid according
// to the rules of consensus
func (c *Chain) verifyBlock(block payload.Block) error {
return nil
}
// VerifyTx verifies whether a transaction is valid according
// to the rules of consensus
func (c *Chain) VerifyTx(tx transaction.Transactioner) error {
return nil
}
// ProcessHeaders will save the set of headers without validating
func (c *Chain) ProcessHeaders(hdrs []*payload.BlockBase) error {
err := c.verifyHeaders(hdrs)
if err != nil {
return ValidationError{err.Error()}
}
err = c.Db.saveHeaders(hdrs)
if err != nil {
return DatabaseError{err.Error()}
}
return nil
}
// verifyHeaders will be used to verify a batch of headers
// should only ever be called during the initial block download
// or when the node receives a HeadersMessage
func (c *Chain) verifyHeaders(hdrs []*payload.BlockBase) error {
return nil
}
// CurrentHeight returns the index of the block
// at the tip of the chain
func (c Chain) CurrentHeight() uint32 {
return c.height
}

View file

@ -1,372 +0,0 @@
package chain
import (
"bufio"
"bytes"
"encoding/binary"
"github.com/CityOfZion/neo-go/pkg/database"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/payload/transaction"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
var (
// TX is the prefix used when inserting a tx into the db
TX = []byte("TX")
// HEADER is the prefix used when inserting a header into the db
HEADER = []byte("HE")
// LATESTHEADER is the prefix used when inserting the latests header into the db
LATESTHEADER = []byte("LH")
// UTXO is the prefix used when inserting a utxo into the db
UTXO = []byte("UT")
// LATESTBLOCK is the prefix used when inserting the latest block into the db
LATESTBLOCK = []byte("LB")
// BLOCKHASHTX is the prefix used when linking a blockhash to a given tx
BLOCKHASHTX = []byte("BT")
// BLOCKHASHHEIGHT is the prefix used when linking a blockhash to it's height
// This is linked both ways
BLOCKHASHHEIGHT = []byte("BH")
// SCRIPTHASHUTXO is the prefix used when linking a utxo to a scripthash
// This is linked both ways
SCRIPTHASHUTXO = []byte("SU")
)
// Chaindb is a wrapper around the db interface which adds an extra block chain specific layer on top.
type Chaindb struct {
db database.Database
}
// This should not be exported for other callers.
// It is safe-guarded by the chain's verification logic
func (c *Chaindb) saveBlock(blk payload.Block, genesis bool) error {
latestBlockTable := database.NewTable(c.db, LATESTBLOCK)
hashHeightTable := database.NewTable(c.db, BLOCKHASHHEIGHT)
// Save Txs and link to block hash
err := c.saveTXs(blk.Txs, blk.Hash.Bytes(), genesis)
if err != nil {
return err
}
// LINK block height to hash - Both ways
// This allows us to fetch a block using it's hash or it's height
// Given the height, we will search the table to get the hash
// We can then fetch all transactions in the tx table, which match that block hash
height := uint32ToBytes(blk.Index)
err = hashHeightTable.Put(height, blk.Hash.Bytes())
if err != nil {
return err
}
err = hashHeightTable.Put(blk.Hash.Bytes(), height)
if err != nil {
return err
}
// Add block as latest block
// This also acts a Commit() for the block.
// If an error occured, then this will be set to the previous block
// This is useful because if the node suddently shut down while saving and the database was not corrupted
// Then the node will see the latestBlock as the last saved block, and re-download the faulty block
// Note: We check for the latest block on startup
return latestBlockTable.Put([]byte(""), blk.Hash.Bytes())
}
// Saves a tx and links each tx to the block it was found in
// This should never be exported. Only way to add a tx, is through it's block
func (c *Chaindb) saveTXs(txs []transaction.Transactioner, blockHash []byte, genesis bool) error {
for txIndex, tx := range txs {
err := c.saveTx(tx, uint32(txIndex), blockHash, genesis)
if err != nil {
return err
}
}
return nil
}
func (c *Chaindb) saveTx(tx transaction.Transactioner, txIndex uint32, blockHash []byte, genesis bool) error {
txTable := database.NewTable(c.db, TX)
blockTxTable := database.NewTable(c.db, BLOCKHASHTX)
// Save the whole tx using it's hash a key
// In order to find a tx in this table, we need to know it's hash
txHash, err := tx.ID()
if err != nil {
return err
}
err = txTable.Put(txHash.Bytes(), tx.BaseTx().Bytes())
if err != nil {
return err
}
// LINK TXhash to block
// This allows us to fetch a tx by just knowing what block it was in
// This is useful for when we want to re-construct a block from it's hash
// In order to ge the tx, we must do a prefix search on blockHash
// This will return a set of txHashes.
//We can then use these hashes to search the txtable for the tx's we need
key := bytesConcat(blockHash, uint32ToBytes(txIndex))
err = blockTxTable.Put(key, txHash.Bytes())
if err != nil {
return err
}
// Save all of the utxos in a transaction
// We do this additional save so that we can form a utxo database
// and know when a transaction is a double spend.
utxos := tx.BaseTx().Outputs
for utxoIndex, utxo := range utxos {
err := c.saveUTXO(utxo, uint16(utxoIndex), txHash.Bytes(), blockHash)
if err != nil {
return err
}
}
// Do not check for spent utxos on the genesis block
if genesis {
return nil
}
// Remove all spent utxos
// We do this so that once an output has been spent
// It will be removed from the utxo database and cannot be spent again
// If the output was never in the utxo database, this function will return an error
txos := tx.BaseTx().Inputs
for _, txo := range txos {
err := c.removeUTXO(txo)
if err != nil {
return err
}
}
return nil
}
// saveUTxo will save a utxo and link it to it's transaction and block
func (c *Chaindb) saveUTXO(utxo *transaction.Output, utxoIndex uint16, txHash, blockHash []byte) error {
utxoTable := database.NewTable(c.db, UTXO)
scripthashUTXOTable := database.NewTable(c.db, SCRIPTHASHUTXO)
// This is quite messy, we should (if possible) find a way to pass a Writer and Reader interface
// Encode utxo into a buffer
buf := new(bytes.Buffer)
bw := &util.BinWriter{W: buf}
if utxo.Encode(bw); bw.Err != nil {
return bw.Err
}
// Save UTXO
// In order to find a utxo in the utxoTable
// One must know the txHash that the utxo was in
key := bytesConcat(txHash, uint16ToBytes(utxoIndex))
if err := utxoTable.Put(key, buf.Bytes()); err != nil {
return err
}
// LINK utxo to scripthash
// This allows us to find a utxo with the scriptHash
// Since the key starts with scriptHash, we can look for the scriptHash prefix
// and find all utxos for a given scriptHash.
// Additionally, we can search for all utxos for a certain user in a certain block with scriptHash+blockHash
// But this may not be of use to us. However, note that we cannot have just the scriptHash with the utxoIndex
// as this may not be unique. If Kim/Dautt agree, we can change blockHash to blockHeight, which allows us
// To get all utxos above a certain blockHeight. Question is; Would this be useful?
newKey := bytesConcat(utxo.ScriptHash.Bytes(), blockHash, uint16ToBytes(utxoIndex))
if err := scripthashUTXOTable.Put(newKey, key); err != nil {
return err
}
if err := scripthashUTXOTable.Put(key, newKey); err != nil {
return err
}
return nil
}
// Remove
func (c *Chaindb) removeUTXO(txo *transaction.Input) error {
utxoTable := database.NewTable(c.db, UTXO)
scripthashUTXOTable := database.NewTable(c.db, SCRIPTHASHUTXO)
// Remove spent utxos from utxo database
key := bytesConcat(txo.PrevHash.Bytes(), uint16ToBytes(txo.PrevIndex))
err := utxoTable.Delete(key)
if err != nil {
return err
}
// Remove utxos from scripthash table
otherKey, err := scripthashUTXOTable.Get(key)
if err != nil {
return err
}
if err := scripthashUTXOTable.Delete(otherKey); err != nil {
return err
}
if err := scripthashUTXOTable.Delete(key); err != nil {
return err
}
return nil
}
// saveHeaders will save a set of headers into the database
func (c *Chaindb) saveHeaders(headers []*payload.BlockBase) error {
for _, hdr := range headers {
err := c.saveHeader(hdr)
if err != nil {
return err
}
}
return nil
}
// saveHeader saves a header into the database and updates the latest header
// The headers are saved with their `blockheights` as Key
// If we want to search for a header, we need to know it's index
// Alternatively, we can search the hashHeightTable with the block index to get the hash
// If the block has been saved.
// The reason why headers are saved with their index as Key, is so that we can
// increment the key to find out what block we should fetch next during the initial
// block download, when we are saving thousands of headers
func (c *Chaindb) saveHeader(hdr *payload.BlockBase) error {
headerTable := database.NewTable(c.db, HEADER)
latestHeaderTable := database.NewTable(c.db, LATESTHEADER)
index := uint32ToBytes(hdr.Index)
byt, err := hdr.Bytes()
if err != nil {
return err
}
err = headerTable.Put(index, byt)
if err != nil {
return err
}
// Update latest header
return latestHeaderTable.Put([]byte(""), index)
}
// GetHeaderFromHeight will get a header given it's block height
func (c *Chaindb) GetHeaderFromHeight(index []byte) (*payload.BlockBase, error) {
headerTable := database.NewTable(c.db, HEADER)
hdrBytes, err := headerTable.Get(index)
if err != nil {
return nil, err
}
reader := bytes.NewReader(hdrBytes)
blockBase := &payload.BlockBase{}
err = blockBase.Decode(reader)
if err != nil {
return nil, err
}
return blockBase, nil
}
// GetLastHeader will get the header which was saved last in the database
func (c *Chaindb) GetLastHeader() (*payload.BlockBase, error) {
latestHeaderTable := database.NewTable(c.db, LATESTHEADER)
index, err := latestHeaderTable.Get([]byte(""))
if err != nil {
return nil, err
}
return c.GetHeaderFromHeight(index)
}
// GetBlockFromHash will return a block given it's hash
func (c *Chaindb) GetBlockFromHash(blockHash []byte) (*payload.Block, error) {
blockTxTable := database.NewTable(c.db, BLOCKHASHTX)
// To get a block we need to fetch:
// The transactions (1)
// The header (2)
// Reconstruct block by fetching it's txs (1)
var txs []transaction.Transactioner
// Get all Txhashes for this block
txHashes, err := blockTxTable.Prefix(blockHash)
if err != nil {
return nil, err
}
// Get all Tx's given their hash
txTable := database.NewTable(c.db, TX)
for _, txHash := range txHashes {
// Fetch tx by it's hash
txBytes, err := txTable.Get(txHash)
if err != nil {
return nil, err
}
reader := bufio.NewReader(bytes.NewReader(txBytes))
tx, err := transaction.FromReader(reader)
if err != nil {
return nil, err
}
txs = append(txs, tx)
}
// Now fetch the header (2)
// We have the block hash, but headers are stored with their `Height` as key.
// We first search the `BlockHashHeight` table to get the height.
//Then we search the headers table with the height
hashHeightTable := database.NewTable(c.db, BLOCKHASHHEIGHT)
height, err := hashHeightTable.Get(blockHash)
if err != nil {
return nil, err
}
hdr, err := c.GetHeaderFromHeight(height)
if err != nil {
return nil, err
}
// Construct block
block := &payload.Block{
BlockBase: *hdr,
Txs: txs,
}
return block, nil
}
// GetLastBlock will return the last block that has been saved
func (c *Chaindb) GetLastBlock() (*payload.Block, error) {
latestBlockTable := database.NewTable(c.db, LATESTBLOCK)
blockHash, err := latestBlockTable.Get([]byte(""))
if err != nil {
return nil, err
}
return c.GetBlockFromHash(blockHash)
}
func uint16ToBytes(x uint16) []byte {
index := make([]byte, 2)
binary.BigEndian.PutUint16(index, x)
return index
}
func uint32ToBytes(x uint32) []byte {
index := make([]byte, 4)
binary.BigEndian.PutUint32(index, x)
return index
}
func bytesConcat(args ...[]byte) []byte {
var res []byte
for _, arg := range args {
res = append(res, arg...)
}
return res
}

View file

@ -1,201 +0,0 @@
package chain
import (
"bytes"
"math/rand"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/CityOfZion/neo-go/pkg/database"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/payload/transaction"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
var s = rand.NewSource(time.Now().UnixNano())
var r = rand.New(s)
func TestLastHeader(t *testing.T) {
_, cdb, hdrs := saveRandomHeaders(t)
// Select last header from list of headers
lastHeader := hdrs[len(hdrs)-1]
// GetLastHeader from the database
hdr, err := cdb.GetLastHeader()
assert.Nil(t, err)
assert.Equal(t, hdr.Index, lastHeader.Index)
// Clean up
os.RemoveAll(database.DbDir)
}
func TestSaveHeader(t *testing.T) {
// save headers then fetch a random element
db, _, hdrs := saveRandomHeaders(t)
headerTable := database.NewTable(db, HEADER)
// check that each header was saved
for _, hdr := range hdrs {
index := uint32ToBytes(hdr.Index)
ok, err := headerTable.Has(index)
assert.Nil(t, err)
assert.True(t, ok)
}
// Clean up
os.RemoveAll(database.DbDir)
}
func TestSaveBlock(t *testing.T) {
// Init databases
db, err := database.New("temp.test")
assert.Nil(t, err)
cdb := &Chaindb{db}
// Construct block0 and block1
block0, block1 := twoBlocksLinked(t)
// Save genesis header
err = cdb.saveHeader(&block0.BlockBase)
assert.Nil(t, err)
// Save genesis block
err = cdb.saveBlock(block0, true)
assert.Nil(t, err)
// Test genesis block saved
testBlockWasSaved(t, cdb, block0)
// Save block1 header
err = cdb.saveHeader(&block1.BlockBase)
assert.Nil(t, err)
// Save block1
err = cdb.saveBlock(block1, false)
assert.Nil(t, err)
// Test block1 was saved
testBlockWasSaved(t, cdb, block1)
// Clean up
os.RemoveAll(database.DbDir)
}
func testBlockWasSaved(t *testing.T, cdb *Chaindb, block payload.Block) {
// Fetch last block from database
lastBlock, err := cdb.GetLastBlock()
assert.Nil(t, err)
// Get byte representation of last block from database
byts, err := lastBlock.Bytes()
assert.Nil(t, err)
// Get byte representation of block that we saved
blockBytes, err := block.Bytes()
assert.Nil(t, err)
// Should be equal
assert.True(t, bytes.Equal(byts, blockBytes))
}
func randomHeaders(t *testing.T) []*payload.BlockBase {
assert := assert.New(t)
hdrsMsg, err := payload.NewHeadersMessage()
assert.Nil(err)
for i := 0; i < 2000; i++ {
err = hdrsMsg.AddHeader(randomBlockBase(t))
assert.Nil(err)
}
return hdrsMsg.Headers
}
func randomBlockBase(t *testing.T) *payload.BlockBase {
base := &payload.BlockBase{
Version: r.Uint32(),
PrevHash: randUint256(t),
MerkleRoot: randUint256(t),
Timestamp: r.Uint32(),
Index: r.Uint32(),
ConsensusData: r.Uint64(),
NextConsensus: randUint160(t),
Witness: transaction.Witness{
InvocationScript: []byte{0, 1, 2, 34, 56},
VerificationScript: []byte{0, 12, 3, 45, 66},
},
Hash: randUint256(t),
}
return base
}
func randomTxs(t *testing.T) []transaction.Transactioner {
var txs []transaction.Transactioner
for i := 0; i < 10; i++ {
tx := transaction.NewContract(0)
tx.AddInput(transaction.NewInput(randUint256(t), uint16(r.Int())))
tx.AddOutput(transaction.NewOutput(randUint256(t), r.Int63(), randUint160(t)))
txs = append(txs, tx)
}
return txs
}
func saveRandomHeaders(t *testing.T) (database.Database, *Chaindb, []*payload.BlockBase) {
db, err := database.New("temp.test")
assert.Nil(t, err)
cdb := &Chaindb{db}
hdrs := randomHeaders(t)
err = cdb.saveHeaders(hdrs)
assert.Nil(t, err)
return db, cdb, hdrs
}
func randUint256(t *testing.T) util.Uint256 {
slice := make([]byte, 32)
_, err := r.Read(slice)
u, err := util.Uint256DecodeBytes(slice)
assert.Nil(t, err)
return u
}
func randUint160(t *testing.T) util.Uint160 {
slice := make([]byte, 20)
_, err := r.Read(slice)
u, err := util.Uint160DecodeBytes(slice)
assert.Nil(t, err)
return u
}
// twoBlocksLinked will return two blocks, the second block spends from the utxos in the first
func twoBlocksLinked(t *testing.T) (payload.Block, payload.Block) {
genesisBase := randomBlockBase(t)
genesisTxs := randomTxs(t)
genesisBlock := payload.Block{BlockBase: *genesisBase, Txs: genesisTxs}
var txs []transaction.Transactioner
// Form transactions that spend from the genesis block
for _, tx := range genesisTxs {
txHash, err := tx.ID()
assert.Nil(t, err)
newTx := transaction.NewContract(0)
newTx.AddInput(transaction.NewInput(txHash, 0))
newTx.AddOutput(transaction.NewOutput(randUint256(t), r.Int63(), randUint160(t)))
txs = append(txs, newTx)
}
nextBase := randomBlockBase(t)
nextBlock := payload.Block{BlockBase: *nextBase, Txs: txs}
return genesisBlock, nextBlock
}

View file

@ -1,19 +0,0 @@
package chain
// ValidationError occurs when verificatio of the object fails
type ValidationError struct {
msg string
}
func (v ValidationError) Error() string {
return v.msg
}
// DatabaseError occurs when the chain fails to save the object in the database
type DatabaseError struct {
msg string
}
func (d DatabaseError) Error() string {
return d.msg
}

View file

@ -1,31 +0,0 @@
package peer
import (
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
)
// LocalConfig specifies the properties that should be available for each remote peer
type LocalConfig struct {
Net protocol.Magic
UserAgent string
Services protocol.ServiceFlag
Nonce uint32
ProtocolVer protocol.Version
Relay bool
Port uint16
// pointer to config will keep the startheight updated
StartHeight func() uint32
// Response Handlers
OnHeader func(*Peer, *payload.HeadersMessage)
OnGetHeaders func(*Peer, *payload.GetHeadersMessage)
OnAddr func(*Peer, *payload.AddrMessage)
OnGetAddr func(*Peer, *payload.GetAddrMessage)
OnInv func(*Peer, *payload.InvMessage)
OnGetData func(*Peer, *payload.GetDataMessage)
OnBlock func(*Peer, *payload.BlockMessage)
OnGetBlocks func(*Peer, *payload.GetBlocksMessage)
OnTx func(*Peer, *payload.TXMessage)
}

View file

@ -1,340 +0,0 @@
// This impl uses channels to simulate the queue handler with the actor model.
// A suitable number k ,should be set for channel size, because if #numOfMsg > k,
// we lose determinism. k chosen should be large enough that when filled, it shall indicate that
// the peer has stopped responding, since we do not have a pingMSG, we will need another way to shut down
// peers
package peer
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/command"
"github.com/CityOfZion/neo-go/pkg/peer/stall"
"github.com/CityOfZion/neo-go/pkg/wire"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
const (
maxOutboundConnections = 100
protocolVer = protocol.DefaultVersion
handshakeTimeout = 30 * time.Second
idleTimeout = 5 * time.Minute // If no message received after idleTimeout, then peer disconnects
// nodes will have `responseTime` seconds to reply with a response
responseTime = 120 * time.Second
// the stall detector will check every `tickerInterval` to see if messages
// are overdue. Should be less than `responseTime`
tickerInterval = 30 * time.Second
// The input buffer size is the amount of mesages that
// can be buffered into the channel to receive at once before
// blocking, and before determinism is broken
inputBufferSize = 100
// The output buffer size is the amount of messages that
// can be buffered into the channel to send at once before
// blocking, and before determinism is broken.
outputBufferSize = 100
// pingInterval = 20 * time.Second //Not implemented in neo clients
)
var (
errHandShakeTimeout = errors.New("Handshake timed out, peers have " + string(handshakeTimeout) + " Seconds to Complete the handshake")
)
// Peer represents a peer on the neo network
type Peer struct {
config LocalConfig
conn net.Conn
startHeight uint32
// atomic vals
disconnected int32
//unchangeable state: concurrent safe
addr string
protoVer protocol.Version
port uint16
inbound bool
userAgent string
services protocol.ServiceFlag
createdAt time.Time
relay bool
statemutex sync.Mutex
verackReceived bool
versionKnown bool
*stall.Detector
inch chan func() // will handle all incoming connections from peer
outch chan func() // will handle all outcoming connections from peer
quitch chan struct{}
}
// NewPeer returns a new NEO peer
func NewPeer(con net.Conn, inbound bool, cfg LocalConfig) *Peer {
return &Peer{
inch: make(chan func(), inputBufferSize),
outch: make(chan func(), outputBufferSize),
quitch: make(chan struct{}, 1),
inbound: inbound,
config: cfg,
conn: con,
createdAt: time.Now(),
startHeight: 0,
addr: con.RemoteAddr().String(),
Detector: stall.NewDetector(responseTime, tickerInterval),
}
}
// Write to a peer
func (p *Peer) Write(msg wire.Messager) error {
return wire.WriteMessage(p.conn, p.config.Net, msg)
}
// Read to a peer
func (p *Peer) Read() (wire.Messager, error) {
return wire.ReadMessage(p.conn, p.config.Net)
}
// Disconnect disconnects a peer and closes the connection
func (p *Peer) Disconnect() {
// return if already disconnected
if atomic.LoadInt32(&p.disconnected) != 0 {
return
}
atomic.AddInt32(&p.disconnected, 1)
p.Detector.Quit()
close(p.quitch)
p.conn.Close()
fmt.Println("Disconnected Peer with address", p.RemoteAddr().String())
}
// Port returns the peers port
func (p *Peer) Port() uint16 {
return p.port
}
// CreatedAt returns the time at which the connection was made
func (p *Peer) CreatedAt() time.Time {
return p.createdAt
}
// Height returns the latest recorded height of this peer
func (p *Peer) Height() uint32 {
return p.startHeight
}
// CanRelay returns true, if the peer can relay information
func (p *Peer) CanRelay() bool {
return p.relay
}
// LocalAddr returns this node's local address
func (p *Peer) LocalAddr() net.Addr {
return p.conn.LocalAddr()
}
// RemoteAddr returns the remote address of the connected peer
func (p *Peer) RemoteAddr() net.Addr {
return p.conn.RemoteAddr()
}
// Services returns the services offered by the peer
func (p *Peer) Services() protocol.ServiceFlag {
return p.config.Services
}
//Inbound returns true whether this peer is an inbound peer
func (p *Peer) Inbound() bool {
return p.inbound
}
// IsVerackReceived returns true, if this node has
// received a verack from this peer
func (p *Peer) IsVerackReceived() bool {
return p.verackReceived
}
//NotifyDisconnect returns once the peer has disconnected
// Blocking
func (p *Peer) NotifyDisconnect() {
<-p.quitch
fmt.Println("Peer has just disconnected")
}
//End of Exposed API functions//
// PingLoop not impl. in neo yet, adding it now
// will cause this client to disconnect from all other implementations
func (p *Peer) PingLoop() { /*not implemented in other neo clients*/ }
// Run is used to start communicating with the peer
// completes the handshake and starts observing
// for messages coming in
func (p *Peer) Run() error {
err := p.Handshake()
if err != nil {
return err
}
go p.StartProtocol()
go p.ReadLoop()
go p.WriteLoop()
//go p.PingLoop() // since it is not implemented. It will disconnect all other impls.
return nil
}
// StartProtocol run as a go-routine, will act as our queue for messages
// should be ran after handshake
func (p *Peer) StartProtocol() {
loop:
for atomic.LoadInt32(&p.disconnected) == 0 {
select {
case f := <-p.inch:
f()
case <-p.quitch:
break loop
case <-p.Detector.Quitch:
fmt.Println("Peer stalled, disconnecting")
break loop
}
}
p.Disconnect()
}
// ReadLoop Will block on the read until a message is read
// Should only be called after handshake is complete
// on a seperate go-routine.
func (p *Peer) ReadLoop() {
idleTimer := time.AfterFunc(idleTimeout, func() {
fmt.Println("Timing out peer")
p.Disconnect()
})
loop:
for atomic.LoadInt32(&p.disconnected) == 0 {
idleTimer.Reset(idleTimeout) // reset timer on each loop
readmsg, err := p.Read()
// Message read; stop Timer
idleTimer.Stop()
if err != nil {
fmt.Println("Err on read", err) // This will also happen if Peer is disconnected
break loop
}
// Remove message as pending from the stall detector
p.Detector.RemoveMessage(readmsg.Command())
switch msg := readmsg.(type) {
case *payload.VersionMessage:
fmt.Println("Already received a Version, disconnecting. " + p.RemoteAddr().String())
break loop // We have already done the handshake, break loop and disconnect
case *payload.VerackMessage:
if p.verackReceived {
fmt.Println("Already received a Verack, disconnecting. " + p.RemoteAddr().String())
break loop
}
p.statemutex.Lock() // This should not happen, however if it does, then we should set it.
p.verackReceived = true
p.statemutex.Unlock()
case *payload.AddrMessage:
p.OnAddr(msg)
case *payload.GetAddrMessage:
p.OnGetAddr(msg)
case *payload.GetBlocksMessage:
p.OnGetBlocks(msg)
case *payload.BlockMessage:
p.OnBlocks(msg)
case *payload.HeadersMessage:
p.OnHeaders(msg)
case *payload.GetHeadersMessage:
p.OnGetHeaders(msg)
case *payload.InvMessage:
p.OnInv(msg)
case *payload.GetDataMessage:
p.OnGetData(msg)
case *payload.TXMessage:
p.OnTX(msg)
default:
fmt.Println("Cannot recognise message", msg.Command()) //Do not disconnect peer, just Log Message
}
}
idleTimer.Stop()
p.Disconnect()
}
// WriteLoop will Queue all messages to be written to the peer.
func (p *Peer) WriteLoop() {
for atomic.LoadInt32(&p.disconnected) == 0 {
select {
case f := <-p.outch:
f()
case <-p.Detector.Quitch: // if the detector quits, disconnect peer
p.Disconnect()
}
}
}
// Outgoing Requests
// RequestHeaders will write a getheaders to this peer
func (p *Peer) RequestHeaders(hash util.Uint256) error {
c := make(chan error, 0)
p.outch <- func() {
getHeaders, err := payload.NewGetHeadersMessage([]util.Uint256{hash}, util.Uint256{})
err = p.Write(getHeaders)
if err != nil {
p.Detector.AddMessage(command.GetHeaders)
}
c <- err
}
return <-c
}
// RequestBlocks will ask this peer for a set of blocks
func (p *Peer) RequestBlocks(hashes []util.Uint256) error {
c := make(chan error, 0)
p.outch <- func() {
getdata, err := payload.NewGetDataMessage(payload.InvTypeBlock)
err = getdata.AddHashes(hashes)
if err != nil {
c <- err
return
}
err = p.Write(getdata)
if err != nil {
p.Detector.AddMessage(command.GetData)
}
c <- err
}
return <-c
}

View file

@ -1,196 +0,0 @@
package peer_test
import (
"net"
"testing"
"time"
"github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/wire"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
"github.com/stretchr/testify/assert"
)
func returnConfig() peer.LocalConfig {
DefaultHeight := func() uint32 {
return 10
}
OnAddr := func(p *peer.Peer, msg *payload.AddrMessage) {}
OnHeader := func(p *peer.Peer, msg *payload.HeadersMessage) {}
OnGetHeaders := func(p *peer.Peer, msg *payload.GetHeadersMessage) {}
OnInv := func(p *peer.Peer, msg *payload.InvMessage) {}
OnGetData := func(p *peer.Peer, msg *payload.GetDataMessage) {}
OnBlock := func(p *peer.Peer, msg *payload.BlockMessage) {}
OnGetBlocks := func(p *peer.Peer, msg *payload.GetBlocksMessage) {}
return peer.LocalConfig{
Net: protocol.MainNet,
UserAgent: "NEO-GO-Default",
Services: protocol.NodePeerService,
Nonce: 1200,
ProtocolVer: 0,
Relay: false,
Port: 10332,
// pointer to config will keep the startheight updated for each version
//Message we plan to send
StartHeight: DefaultHeight,
OnHeader: OnHeader,
OnAddr: OnAddr,
OnGetHeaders: OnGetHeaders,
OnInv: OnInv,
OnGetData: OnGetData,
OnBlock: OnBlock,
OnGetBlocks: OnGetBlocks,
}
}
func TestHandshake(t *testing.T) {
address := ":20338"
go func() {
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
if err != nil {
t.Fatal(err)
}
p := peer.NewPeer(conn, true, returnConfig())
err = p.Run()
verack, err := payload.NewVerackMessage()
if err != nil {
t.Fail()
}
if err := p.Write(verack); err != nil {
t.Fatal(err)
}
assert.Equal(t, true, p.IsVerackReceived())
}()
listener, err := net.Listen("tcp", address)
if err != nil {
t.Fatal(err)
return
}
defer func() {
listener.Close()
}()
for {
conn, err := listener.Accept()
if err != nil {
t.Fatal(err)
}
tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("82.2.97.142"), Port: 20338}
nonce := uint32(100)
messageVer, err := payload.NewVersionMessage(tcpAddrMe, 2595770, false, protocol.DefaultVersion, protocol.UserAgent, nonce, protocol.NodePeerService)
if err != nil {
t.Fatal(err)
}
if err := wire.WriteMessage(conn, protocol.MainNet, messageVer); err != nil {
t.Fatal(err)
return
}
readmsg, err := wire.ReadMessage(conn, protocol.MainNet)
if err != nil {
t.Fatal(err)
}
version, ok := readmsg.(*payload.VersionMessage)
if !ok {
t.Fatal(err)
}
assert.NotEqual(t, nil, version)
messageVrck, err := payload.NewVerackMessage()
if err != nil {
t.Fatal(err)
}
assert.NotEqual(t, nil, messageVrck)
if err := wire.WriteMessage(conn, protocol.MainNet, messageVrck); err != nil {
t.Fatal(err)
}
readmsg, err = wire.ReadMessage(conn, protocol.MainNet)
if err != nil {
t.Fatal(err)
}
assert.NotEqual(t, nil, readmsg)
verk, ok := readmsg.(*payload.VerackMessage)
if !ok {
t.Fatal(err)
}
assert.NotEqual(t, nil, verk)
return
}
}
func TestConfigurations(t *testing.T) {
_, conn := net.Pipe()
inbound := true
config := returnConfig()
p := peer.NewPeer(conn, inbound, config)
// test inbound
assert.Equal(t, inbound, p.Inbound())
// handshake not done, should be false
assert.Equal(t, false, p.IsVerackReceived())
assert.Equal(t, config.Services, p.Services())
assert.Equal(t, config.Relay, p.CanRelay())
assert.WithinDuration(t, time.Now(), p.CreatedAt(), 1*time.Second)
}
func TestPeerDisconnect(t *testing.T) {
// Make sure everything is shutdown
// Make sure timer is shutdown in stall detector too. Should maybe put this part of test into stall detector.
_, conn := net.Pipe()
inbound := true
config := returnConfig()
p := peer.NewPeer(conn, inbound, config)
p.Disconnect()
verack, err := payload.NewVerackMessage()
assert.Nil(t, err)
err = p.Write(verack)
assert.NotNil(t, err)
// Check if stall detector is still running
_, ok := <-p.Detector.Quitch
assert.Equal(t, ok, false)
}
func TestNotifyDisconnect(t *testing.T) {
_, conn := net.Pipe()
inbound := true
config := returnConfig()
p := peer.NewPeer(conn, inbound, config)
p.Disconnect()
p.NotifyDisconnect()
// TestNotify uses default test timeout as the passing condition
// Failure condition can be seen when you comment out p.Disconnect()
}

View file

@ -1,132 +0,0 @@
package peer
import (
"fmt"
"net"
"time"
"github.com/CityOfZion/neo-go/pkg/wire"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip"
)
// Handshake will initiate a handshake with this peer
func (p *Peer) Handshake() error {
handshakeErr := make(chan error, 1)
go func() {
if p.inbound {
handshakeErr <- p.inboundHandShake()
} else {
handshakeErr <- p.outboundHandShake()
}
}()
select {
case err := <-handshakeErr:
if err != nil {
return err
}
case <-time.After(handshakeTimeout):
return errHandShakeTimeout
}
// This is purely here for Logs
if p.inbound {
fmt.Println("inbound handshake with", p.RemoteAddr().String(), "successful")
} else {
fmt.Println("outbound handshake with", p.RemoteAddr().String(), "successful")
}
return nil
}
// If this peer has an inbound conn (conn that is going into another peer)
// then he has dialed and so, we must read the version message
func (p *Peer) inboundHandShake() error {
var err error
if err := p.writeLocalVersionMSG(); err != nil {
return err
}
if err := p.readRemoteVersionMSG(); err != nil {
return err
}
verack, err := payload.NewVerackMessage()
if err != nil {
return err
}
err = p.Write(verack)
return p.readVerack()
}
func (p *Peer) outboundHandShake() error {
var err error
err = p.readRemoteVersionMSG()
if err != nil {
return err
}
err = p.writeLocalVersionMSG()
if err != nil {
return err
}
err = p.readVerack()
if err != nil {
return err
}
verack, err := payload.NewVerackMessage()
if err != nil {
return err
}
return p.Write(verack)
}
func (p *Peer) writeLocalVersionMSG() error {
nonce := p.config.Nonce
relay := p.config.Relay
port := int(p.config.Port)
ua := p.config.UserAgent
sh := p.config.StartHeight()
services := p.config.Services
proto := p.config.ProtocolVer
ip := iputils.GetLocalIP()
tcpAddrMe := &net.TCPAddr{IP: ip, Port: port}
messageVer, err := payload.NewVersionMessage(tcpAddrMe, sh, relay, proto, ua, nonce, services)
if err != nil {
return err
}
return p.Write(messageVer)
}
func (p *Peer) readRemoteVersionMSG() error {
readmsg, err := wire.ReadMessage(p.conn, p.config.Net)
if err != nil {
return err
}
version, ok := readmsg.(*payload.VersionMessage)
if !ok {
return err
}
return p.OnVersion(version)
}
func (p *Peer) readVerack() error {
readmsg, err := wire.ReadMessage(p.conn, p.config.Net)
if err != nil {
return err
}
_, ok := readmsg.(*payload.VerackMessage)
if !ok {
return err
}
// should only be accessed on one go-routine
p.verackReceived = true
return nil
}

View file

@ -1,67 +0,0 @@
# Package - Peer
## Responsibility
Once a connection has been made. The connection will represent a established peer to the localNode. Since a connection and the `Wire` is a golang primitive, that we cannot do much with. The peer package will encapsulate both, while adding extra functionality.
## Features
- The handshake protocol is automatically executed and handled by the peer package. If a Version/Verack is received twice, the peer will be disconnected.
- IdleTimeouts: If a Message is not received from the peer within a set period of time, the peer will be disconnected.
- StallTimeouts: For Example, If a GetHeaders, is sent to the Peer and a Headers Response is not received within a certain period of time, then the peer is disconnected.
- Concurrency Model: The concurrency model used is similar to Actor model, with a few changes. Messages can be sent to a peer asynchronously or synchronously. An example of an synchornous message send is the `RequestHeaders` method, where the channel blocks until an error value is received. The `OnHeaders` message is however asynchronously called. Furthermore, all methods passed through the config, are wrapped inside of an additional `Peers` method, this is to lay the ground work to capturing statistics regarding a specific command. These are also used so that we can pass behaviour to be executed down the channel.
- Configuration: Each Peer will have a config struct passed to it, with information about the Local Peer and functions that will encapsulate the behaviour of what the peer should do, given a request. This way, the peer is not dependent on any other package.
## Usage
conn, err := net.Dial("tcp", "seed2.neo.org:10333")
if err != nil {
fmt.Println("Error dialing connection", err.Error())
return
}
config := peer.LocalConfig{
Net: protocol.MainNet,
UserAgent: "NEO-G",
Services: protocol.NodePeerService,
Nonce: 1200,
ProtocolVer: 0,
Relay: false,
Port: 10332,
StartHeight: LocalHeight,
OnHeader: OnHeader,
}
p := peer.NewPeer(conn, false, config)
err = p.Run()
hash, err := util.Uint256DecodeString(chainparams.GenesisHash)
// hash2, err := util.Uint256DecodeString("ff8fe95efc5d1cc3a22b17503aecaf289cef68f94b79ddad6f613569ca2342d8")
err = p.RequestHeaders(hash)
func OnHeader(peer *peer.Peer, msg *payload.HeadersMessage) {
// This function is passed to peer
// and the peer will execute it on receiving a header
}
func LocalHeight() uint32 {
// This will be a function from the object that handles the block heights
return 10
}
### Notes
Should we follow the actor model for Peers? Each Peer will have a ID, which we can take as the PID or if
we launch a go-routine for each peer, then we can use that as an implicit PID.
Peer information should be stored into a database, if no db exists, we should get it from an initial peers file.
We can use this to periodically store information about a peer.

View file

@ -1,111 +0,0 @@
package peer
import (
"errors"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
// OnGetData is called when a GetData message is received
func (p *Peer) OnGetData(msg *payload.GetDataMessage) {
p.inch <- func() {
if p.config.OnInv != nil {
p.config.OnGetData(p, msg)
}
}
}
//OnTX is called when a TX message is received
func (p *Peer) OnTX(msg *payload.TXMessage) {
p.inch <- func() {
p.inch <- func() {
if p.config.OnTx != nil {
p.config.OnTx(p, msg)
}
}
}
}
// OnInv is called when a Inv message is received
func (p *Peer) OnInv(msg *payload.InvMessage) {
p.inch <- func() {
if p.config.OnInv != nil {
p.config.OnInv(p, msg)
}
}
}
// OnGetHeaders is called when a GetHeaders message is received
func (p *Peer) OnGetHeaders(msg *payload.GetHeadersMessage) {
p.inch <- func() {
if p.config.OnGetHeaders != nil {
p.config.OnGetHeaders(p, msg)
}
}
}
// OnAddr is called when a Addr message is received
func (p *Peer) OnAddr(msg *payload.AddrMessage) {
p.inch <- func() {
if p.config.OnAddr != nil {
p.config.OnAddr(p, msg)
}
}
}
// OnGetAddr is called when a GetAddr message is received
func (p *Peer) OnGetAddr(msg *payload.GetAddrMessage) {
p.inch <- func() {
if p.config.OnGetAddr != nil {
p.config.OnGetAddr(p, msg)
}
}
}
// OnGetBlocks is called when a GetBlocks message is received
func (p *Peer) OnGetBlocks(msg *payload.GetBlocksMessage) {
p.inch <- func() {
if p.config.OnGetBlocks != nil {
p.config.OnGetBlocks(p, msg)
}
}
}
// OnBlocks is called when a Blocks message is received
func (p *Peer) OnBlocks(msg *payload.BlockMessage) {
p.Detector.RemoveMessage(msg.Command())
p.inch <- func() {
if p.config.OnBlock != nil {
p.config.OnBlock(p, msg)
}
}
}
// OnHeaders is called when a Headers message is received
func (p *Peer) OnHeaders(msg *payload.HeadersMessage) {
p.Detector.RemoveMessage(msg.Command())
p.inch <- func() {
if p.config.OnHeader != nil {
p.config.OnHeader(p, msg)
}
}
}
// OnVersion Listener will be called
// during the handshake, any error checking should be done here for the versionMessage.
// This should only ever be called during the handshake. Any other place and the peer will disconnect.
func (p *Peer) OnVersion(msg *payload.VersionMessage) error {
if msg.Nonce == p.config.Nonce {
p.conn.Close()
return errors.New("self connection, disconnecting Peer")
}
p.versionKnown = true
p.port = msg.Port
p.services = msg.Services
p.userAgent = string(msg.UserAgent)
p.createdAt = time.Now()
p.relay = msg.Relay
p.startHeight = msg.StartHeight
return nil
}

View file

@ -1,175 +0,0 @@
package stall
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/command"
)
// Detector (stall detector) will keep track of all pendingMessages
// If any message takes too long to reply
// the detector will disconnect the peer
type Detector struct {
responseTime time.Duration
tickInterval time.Duration
lock *sync.RWMutex
responses map[command.Type]time.Time
// The detector is embedded into a peer and the peer watches this quit chan
// If this chan is closed, the peer disconnects
Quitch chan struct{}
// atomic vals
disconnected int32
}
// NewDetector will create a new stall detector
// rT is the responseTime and signals how long
// a peer has to reply back to a sent message
// tickerInterval is how often the detector wil check for stalled messages
func NewDetector(rTime time.Duration, tickerInterval time.Duration) *Detector {
d := &Detector{
responseTime: rTime,
tickInterval: tickerInterval,
lock: new(sync.RWMutex),
responses: map[command.Type]time.Time{},
Quitch: make(chan struct{}),
}
go d.loop()
return d
}
func (d *Detector) loop() {
ticker := time.NewTicker(d.tickInterval)
defer func() {
d.Quit()
d.DeleteAll()
ticker.Stop()
}()
for {
select {
case <-ticker.C:
now := time.Now()
d.lock.RLock()
resp := d.responses
d.lock.RUnlock()
for _, deadline := range resp {
if now.After(deadline) {
fmt.Println(resp)
fmt.Println("Deadline passed")
return
}
}
}
}
}
// Quit is a concurrent safe way to call the Quit channel
// Without blocking
func (d *Detector) Quit() {
// return if already disconnected
if atomic.LoadInt32(&d.disconnected) != 0 {
return
}
atomic.AddInt32(&d.disconnected, 1)
close(d.Quitch)
}
//AddMessage will add a message to the responses map
// Call this function when we send a message to a peer
// The command passed through is the command that we sent
// we will then set a timer for the expected message(s)
func (d *Detector) AddMessage(cmd command.Type) {
cmds := d.addMessage(cmd)
d.lock.Lock()
for _, cmd := range cmds {
d.responses[cmd] = time.Now().Add(d.responseTime)
}
d.lock.Unlock()
}
// RemoveMessage remove messages from the responses map
// Call this function when we receive a message from
// peer. This will remove the pendingresponse message from the map.
// The command passed through is the command we received
func (d *Detector) RemoveMessage(cmd command.Type) {
cmds := d.removeMessage(cmd)
d.lock.Lock()
for _, cmd := range cmds {
delete(d.responses, cmd)
}
d.lock.Unlock()
}
// DeleteAll empties the map of all contents and
// is called when the detector is being shut down
func (d *Detector) DeleteAll() {
d.lock.Lock()
d.responses = make(map[command.Type]time.Time)
d.lock.Unlock()
}
// GetMessages Will return a map of all of the pendingResponses
// and their deadlines
func (d *Detector) GetMessages() map[command.Type]time.Time {
var resp map[command.Type]time.Time
d.lock.RLock()
resp = d.responses
d.lock.RUnlock()
return resp
}
// when a message is added, we will add a deadline for
// expected response
func (d *Detector) addMessage(cmd command.Type) []command.Type {
var cmds []command.Type
switch cmd {
case command.GetHeaders:
// We now will expect a Headers Message
cmds = append(cmds, command.Headers)
case command.GetAddr:
// We now will expect a Headers Message
cmds = append(cmds, command.Addr)
case command.GetData:
// We will now expect a block/tx message
cmds = append(cmds, command.Block)
cmds = append(cmds, command.TX)
case command.GetBlocks:
// we will now expect a inv message
cmds = append(cmds, command.Inv)
case command.Version:
// We will now expect a verack
cmds = append(cmds, command.Verack)
}
return cmds
}
// if receive a message, we will delete it from pending
func (d *Detector) removeMessage(cmd command.Type) []command.Type {
var cmds []command.Type
switch cmd {
case command.Block:
// We will now remove a block and tx message
cmds = append(cmds, command.Block)
cmds = append(cmds, command.TX)
case command.TX:
// We will now remove a block and tx message
cmds = append(cmds, command.Block)
cmds = append(cmds, command.TX)
case command.Verack:
// We will now expect a verack
cmds = append(cmds, cmd)
default:
cmds = append(cmds, cmd)
}
return cmds
}

View file

@ -1,84 +0,0 @@
package stall
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/CityOfZion/neo-go/pkg/wire/command"
)
func TestAddRemoveMessage(t *testing.T) {
responseTime := 2 * time.Millisecond
tickerInterval := 1 * time.Millisecond
d := NewDetector(responseTime, tickerInterval)
d.AddMessage(command.GetAddr)
mp := d.GetMessages()
assert.Equal(t, 1, len(mp))
assert.IsType(t, time.Time{}, mp[command.GetAddr])
d.RemoveMessage(command.Addr)
mp = d.GetMessages()
assert.Equal(t, 0, len(mp))
assert.Empty(t, mp[command.GetAddr])
}
type mockPeer struct {
lock *sync.RWMutex
online bool
detector *Detector
}
func (mp *mockPeer) loop() {
loop:
for {
select {
case <-mp.detector.Quitch:
break loop
}
}
// cleanup
mp.lock.Lock()
mp.online = false
mp.lock.Unlock()
}
func TestDeadlineWorks(t *testing.T) {
responseTime := 2 * time.Millisecond
tickerInterval := 1 * time.Millisecond
d := NewDetector(responseTime, tickerInterval)
mp := mockPeer{online: true, detector: d, lock: new(sync.RWMutex)}
go mp.loop()
d.AddMessage(command.GetAddr)
time.Sleep(responseTime + 1*time.Millisecond)
k := make(map[command.Type]time.Time)
d.lock.RLock()
assert.Equal(t, k, d.responses)
d.lock.RUnlock()
mp.lock.RLock()
assert.Equal(t, false, mp.online)
mp.lock.RUnlock()
}
func TestDeadlineShouldNotBeEmpty(t *testing.T) {
responseTime := 10 * time.Millisecond
tickerInterval := 1 * time.Millisecond
d := NewDetector(responseTime, tickerInterval)
d.AddMessage(command.GetAddr)
time.Sleep(1 * time.Millisecond)
k := make(map[command.Type]time.Time)
d.lock.RLock()
assert.NotEqual(t, k, d.responses)
d.lock.RUnlock()
}

View file

@ -1,155 +0,0 @@
package peermgr
import (
"errors"
"sort"
"sync"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
var (
//ErrCacheLimit is returned when the cache limit is reached
ErrCacheLimit = errors.New("nomore items can be added to the cache")
//ErrNoItems is returned when pickItem is called and there are no items in the cache
ErrNoItems = errors.New("there are no items in the cache")
//ErrDuplicateItem is returned when you try to add the same item, more than once to the cache
ErrDuplicateItem = errors.New("this item is already in the cache")
)
//BlockInfo holds the necessary information that the cache needs
// to sort and store block requests
type BlockInfo struct {
BlockHash util.Uint256
BlockIndex uint32
}
// Equals returns true if two blockInfo objects
// have the same hash and the same index
func (bi *BlockInfo) Equals(other BlockInfo) bool {
return bi.BlockHash.Equals(other.BlockHash) && bi.BlockIndex == other.BlockIndex
}
// indexSorter sorts the blockInfos by blockIndex.
type indexSorter []BlockInfo
func (is indexSorter) Len() int { return len(is) }
func (is indexSorter) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
func (is indexSorter) Less(i, j int) bool { return is[i].BlockIndex < is[j].BlockIndex }
//blockCache will cache any pending block requests
// for the node when there are no available nodes
type blockCache struct {
cacheLimit int
cacheLock sync.Mutex
cache []BlockInfo
}
func newBlockCache(cacheLimit int) *blockCache {
return &blockCache{
cache: make([]BlockInfo, 0, cacheLimit),
cacheLimit: cacheLimit,
}
}
func (bc *blockCache) addBlockInfo(bi BlockInfo) error {
if bc.cacheLen() == bc.cacheLimit {
return ErrCacheLimit
}
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
// Check for duplicates. slice will always be small so a simple for loop will work
for _, bInfo := range bc.cache {
if bInfo.Equals(bi) {
return ErrDuplicateItem
}
}
bc.cache = append(bc.cache, bi)
sort.Sort(indexSorter(bc.cache))
return nil
}
func (bc *blockCache) addBlockInfos(bis []BlockInfo) error {
if len(bis)+bc.cacheLen() > bc.cacheLimit {
return errors.New("too many items to add, this will exceed the cache limit")
}
for _, bi := range bis {
err := bc.addBlockInfo(bi)
if err != nil {
return err
}
}
return nil
}
func (bc *blockCache) cacheLen() int {
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
return len(bc.cache)
}
func (bc *blockCache) pickFirstItem() (BlockInfo, error) {
return bc.pickItem(0)
}
func (bc *blockCache) pickAllItems() ([]BlockInfo, error) {
numOfItems := bc.cacheLen()
items := make([]BlockInfo, 0, numOfItems)
for i := 0; i < numOfItems; i++ {
bi, err := bc.pickFirstItem()
if err != nil {
return nil, err
}
items = append(items, bi)
}
return items, nil
}
func (bc *blockCache) pickItem(i uint) (BlockInfo, error) {
if bc.cacheLen() < 1 {
return BlockInfo{}, ErrNoItems
}
if i >= uint(bc.cacheLen()) {
return BlockInfo{}, errors.New("index out of range")
}
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
item := bc.cache[i]
bc.cache = append(bc.cache[:i], bc.cache[i+1:]...)
return item, nil
}
func (bc *blockCache) removeHash(hashToRemove util.Uint256) error {
index, err := bc.findHash(hashToRemove)
if err != nil {
return err
}
_, err = bc.pickItem(uint(index))
return err
}
func (bc *blockCache) findHash(hashToFind util.Uint256) (int, error) {
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
for i, bInfo := range bc.cache {
if bInfo.BlockHash.Equals(hashToFind) {
return i, nil
}
}
return -1, errors.New("hash cannot be found in the cache")
}

View file

@ -1,80 +0,0 @@
package peermgr
import (
"math/rand"
"testing"
"github.com/CityOfZion/neo-go/pkg/wire/util"
"github.com/stretchr/testify/assert"
)
func TestAddBlock(t *testing.T) {
bc := &blockCache{
cacheLimit: 20,
}
bi := randomBlockInfo(t)
err := bc.addBlockInfo(bi)
assert.Equal(t, nil, err)
assert.Equal(t, 1, bc.cacheLen())
err = bc.addBlockInfo(bi)
assert.Equal(t, ErrDuplicateItem, err)
assert.Equal(t, 1, bc.cacheLen())
}
func TestCacheLimit(t *testing.T) {
bc := &blockCache{
cacheLimit: 20,
}
for i := 0; i < bc.cacheLimit; i++ {
err := bc.addBlockInfo(randomBlockInfo(t))
assert.Equal(t, nil, err)
}
err := bc.addBlockInfo(randomBlockInfo(t))
assert.Equal(t, ErrCacheLimit, err)
assert.Equal(t, bc.cacheLimit, bc.cacheLen())
}
func TestPickItem(t *testing.T) {
bc := &blockCache{
cacheLimit: 20,
}
for i := 0; i < bc.cacheLimit; i++ {
err := bc.addBlockInfo(randomBlockInfo(t))
assert.Equal(t, nil, err)
}
for i := 0; i < bc.cacheLimit; i++ {
_, err := bc.pickFirstItem()
assert.Equal(t, nil, err)
}
assert.Equal(t, 0, bc.cacheLen())
}
func randomUint256(t *testing.T) util.Uint256 {
rand32 := make([]byte, 32)
rand.Read(rand32)
u, err := util.Uint256DecodeBytes(rand32)
assert.Equal(t, nil, err)
return u
}
func randomBlockInfo(t *testing.T) BlockInfo {
return BlockInfo{
randomUint256(t),
rand.Uint32(),
}
}

View file

@ -1,227 +0,0 @@
package peermgr
import (
"errors"
"fmt"
"sync"
"github.com/CityOfZion/neo-go/pkg/wire/command"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
const (
// blockCacheLimit is the maximum amount of pending requests that the cache can hold
pendingBlockCacheLimit = 20
//peerBlockCacheLimit is the maximum amount of inflight blocks that a peer can
// have, before they are flagged as busy
peerBlockCacheLimit = 1
)
var (
//ErrNoAvailablePeers is returned when a request for data from a peer is invoked
// but there are no available peers to request data from
ErrNoAvailablePeers = errors.New("there are no available peers to interact with")
// ErrUnknownPeer is returned when a peer that the peer manager does not know about
// sends a message to this node
ErrUnknownPeer = errors.New("this peer has not been registered with the peer manager")
)
//mPeer represents a peer that is managed by the peer manager
type mPeer interface {
Disconnect()
RequestBlocks([]util.Uint256) error
RequestHeaders(util.Uint256) error
NotifyDisconnect()
}
type peerstats struct {
// when a peer is sent a blockRequest
// the peermanager will track this using this blockCache
blockCache *blockCache
// all other requests will be tracked using the requests map
requests map[command.Type]bool
}
//PeerMgr manages all peers that the node is connected to
type PeerMgr struct {
pLock sync.RWMutex
peers map[mPeer]peerstats
requestCache *blockCache
}
//New returns a new peermgr object
func New() *PeerMgr {
return &PeerMgr{
peers: make(map[mPeer]peerstats),
requestCache: newBlockCache(pendingBlockCacheLimit),
}
}
// AddPeer adds a peer to the list of managed peers
func (pmgr *PeerMgr) AddPeer(peer mPeer) {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
if _, exists := pmgr.peers[peer]; exists {
return
}
pmgr.peers[peer] = peerstats{
requests: make(map[command.Type]bool),
blockCache: newBlockCache(peerBlockCacheLimit),
}
go pmgr.onDisconnect(peer)
}
//MsgReceived notifies the peer manager that we have received a
// message from a peer
func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
// if peer was unknown then disconnect
val, ok := pmgr.peers[peer]
if !ok {
go func() {
peer.NotifyDisconnect()
}()
peer.Disconnect()
return ErrUnknownPeer
}
val.requests[cmd] = false
return nil
}
//BlockMsgReceived notifies the peer manager that we have received a
// block message from a peer
func (pmgr *PeerMgr) BlockMsgReceived(peer mPeer, bi BlockInfo) error {
// if peer was unknown then disconnect
val, ok := pmgr.peers[peer]
if !ok {
go func() {
peer.NotifyDisconnect()
}()
peer.Disconnect()
return ErrUnknownPeer
}
// // remove item from the peersBlock cache
err := val.blockCache.removeHash(bi.BlockHash)
if err != nil {
return err
}
// check if cache empty, if so then return
if pmgr.requestCache.cacheLen() == 0 {
return nil
}
// Try to clean an item from the pendingBlockCache, a peer has just finished serving a block request
cachedBInfo, err := pmgr.requestCache.pickFirstItem()
if err != nil {
return err
}
return pmgr.blockCallPeer(cachedBInfo, func(p mPeer) error {
return p.RequestBlocks([]util.Uint256{cachedBInfo.BlockHash})
})
}
// Len returns the amount of peers that the peer manager
//currently knows about
func (pmgr *PeerMgr) Len() int {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
return len(pmgr.peers)
}
// RequestBlock will request a block from the most
// available peer. Then update it's stats, so we know that
// this peer is busy
func (pmgr *PeerMgr) RequestBlock(bi BlockInfo) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
err := pmgr.blockCallPeer(bi, func(p mPeer) error {
return p.RequestBlocks([]util.Uint256{bi.BlockHash})
})
if err == ErrNoAvailablePeers {
return pmgr.requestCache.addBlockInfo(bi)
}
return err
}
// RequestHeaders will request a headers from the most available peer.
func (pmgr *PeerMgr) RequestHeaders(hash util.Uint256) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
return pmgr.callPeerForCmd(command.Headers, func(p mPeer) error {
return p.RequestHeaders(hash)
})
}
func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) error {
for peer, stats := range pmgr.peers {
if !stats.requests[cmd] {
stats.requests[cmd] = true
return f(peer)
}
}
return ErrNoAvailablePeers
}
func (pmgr *PeerMgr) blockCallPeer(bi BlockInfo, f func(p mPeer) error) error {
for peer, stats := range pmgr.peers {
if stats.blockCache.cacheLen() < peerBlockCacheLimit {
err := stats.blockCache.addBlockInfo(bi)
if err != nil {
return err
}
return f(peer)
}
}
return ErrNoAvailablePeers
}
func (pmgr *PeerMgr) onDisconnect(p mPeer) {
// Blocking until peer is disconnected
p.NotifyDisconnect()
pmgr.pLock.Lock()
defer func() {
delete(pmgr.peers, p)
pmgr.pLock.Unlock()
}()
// Add all of peers outstanding block requests into
// the peer managers pendingBlockRequestCache
val, ok := pmgr.peers[p]
if !ok {
return
}
pendingRequests, err := val.blockCache.pickAllItems()
if err != nil {
fmt.Println(err.Error())
return
}
err = pmgr.requestCache.addBlockInfos(pendingRequests)
if err != nil {
fmt.Println(err.Error())
return
}
}

View file

@ -1,201 +0,0 @@
package peermgr
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/wire/command"
"github.com/CityOfZion/neo-go/pkg/wire/util"
"github.com/stretchr/testify/assert"
)
type peer struct {
quit chan bool
nonce int
disconnected bool
blockRequested int
headersRequested int
}
func (p *peer) Disconnect() {
p.disconnected = true
p.quit <- true
}
func (p *peer) RequestBlocks([]util.Uint256) error {
p.blockRequested++
return nil
}
func (p *peer) RequestHeaders(util.Uint256) error {
p.headersRequested++
return nil
}
func (p *peer) NotifyDisconnect() {
<-p.quit
}
func TestAddPeer(t *testing.T) {
pmgr := New()
peerA := &peer{nonce: 1}
peerB := &peer{nonce: 2}
peerC := &peer{nonce: 3}
pmgr.AddPeer(peerA)
pmgr.AddPeer(peerB)
pmgr.AddPeer(peerC)
pmgr.AddPeer(peerC)
assert.Equal(t, 3, pmgr.Len())
}
func TestRequestBlocks(t *testing.T) {
pmgr := New()
peerA := &peer{nonce: 1}
peerB := &peer{nonce: 2}
peerC := &peer{nonce: 3}
pmgr.AddPeer(peerA)
pmgr.AddPeer(peerB)
pmgr.AddPeer(peerC)
firstBlock := randomBlockInfo(t)
err := pmgr.RequestBlock(firstBlock)
assert.Nil(t, err)
secondBlock := randomBlockInfo(t)
err = pmgr.RequestBlock(secondBlock)
assert.Nil(t, err)
thirdBlock := randomBlockInfo(t)
err = pmgr.RequestBlock(thirdBlock)
assert.Nil(t, err)
// Since the peer manager did not get a MsgReceived
// in between the block requests
// a request should be sent to all peers
// This is only true, if peerBlockCacheLimit == 1
assert.Equal(t, 1, peerA.blockRequested)
assert.Equal(t, 1, peerB.blockRequested)
assert.Equal(t, 1, peerC.blockRequested)
// Since the peer manager still has not received a MsgReceived
// another call to request blocks, will add the request to the cache
// and return a nil err
fourthBlock := randomBlockInfo(t)
err = pmgr.RequestBlock(fourthBlock)
assert.Equal(t, nil, err)
assert.Equal(t, 1, pmgr.requestCache.cacheLen())
// If we tell the peer manager that we have received a block
// it will check the cache for any pending requests and send a block request if there are any.
// The request will go to the peer who sent back the block corresponding to the first hash
// since the other two peers are still busy with their block requests
peer := findPeerwithHash(t, pmgr, firstBlock.BlockHash)
err = pmgr.BlockMsgReceived(peer, firstBlock)
assert.Nil(t, err)
totalRequests := peerA.blockRequested + peerB.blockRequested + peerC.blockRequested
assert.Equal(t, 4, totalRequests)
// // cache should be empty now
assert.Equal(t, 0, pmgr.requestCache.cacheLen())
}
// The peer manager does not tell you what peer was sent a particular block request
// For testing purposes, the following function will find that peer
func findPeerwithHash(t *testing.T, pmgr *PeerMgr, blockHash util.Uint256) mPeer {
for peer, stats := range pmgr.peers {
_, err := stats.blockCache.findHash(blockHash)
if err == nil {
return peer
}
}
assert.Fail(t, "cannot find a peer with that hash")
return nil
}
func TestRequestHeaders(t *testing.T) {
pmgr := New()
peerA := &peer{nonce: 1}
peerB := &peer{nonce: 2}
peerC := &peer{nonce: 3}
pmgr.AddPeer(peerA)
pmgr.AddPeer(peerB)
pmgr.AddPeer(peerC)
err := pmgr.RequestHeaders(util.Uint256{})
assert.Nil(t, err)
err = pmgr.RequestHeaders(util.Uint256{})
assert.Nil(t, err)
err = pmgr.RequestHeaders(util.Uint256{})
assert.Nil(t, err)
// Since the peer manager did not get a MsgReceived
// in between the header requests
// a request should be sent to all peers
assert.Equal(t, 1, peerA.headersRequested)
assert.Equal(t, 1, peerB.headersRequested)
assert.Equal(t, 1, peerC.headersRequested)
// Since the peer manager still has not received a MsgReceived
// another call to request header, will return a NoAvailablePeerError
err = pmgr.RequestHeaders(util.Uint256{})
assert.Equal(t, ErrNoAvailablePeers, err)
// If we tell the peer manager that peerA has given us a block
// then send another BlockRequest. It will go to peerA
// since the other two peers are still busy with their
// block requests
err = pmgr.MsgReceived(peerA, command.Headers)
assert.Nil(t, err)
err = pmgr.RequestHeaders(util.Uint256{})
assert.Nil(t, err)
assert.Equal(t, 2, peerA.headersRequested)
assert.Equal(t, 1, peerB.headersRequested)
assert.Equal(t, 1, peerC.headersRequested)
}
func TestUnknownPeer(t *testing.T) {
pmgr := New()
unknownPeer := &peer{
disconnected: false,
quit: make(chan bool),
}
err := pmgr.MsgReceived(unknownPeer, command.Headers)
assert.Equal(t, true, unknownPeer.disconnected)
assert.Equal(t, ErrUnknownPeer, err)
}
func TestNotifyDisconnect(t *testing.T) {
pmgr := New()
peerA := &peer{
nonce: 1,
quit: make(chan bool),
}
pmgr.AddPeer(peerA)
if pmgr.Len() != 1 {
t.Fail()
}
peerA.Disconnect()
if pmgr.Len() != 0 {
t.Fail()
}
}

View file

@ -1,61 +0,0 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
// blockModeOnBlock is called when the sync manager is block mode
// and receives a block.
func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error {
// Check if it is a future block
// XXX: since we are storing blocks in memory, we do not want to store blocks
// from the tip
if block.Index > s.nextBlockIndex+2000 {
return nil
}
if block.Index > s.nextBlockIndex {
s.addToBlockPool(block)
return nil
}
// Process Block
err := s.processBlock(block)
if err != nil && err != chain.ErrBlockAlreadyExists {
return s.cfg.FetchBlockAgain(block.Hash)
}
// Check the block pool
err = s.checkPool()
if err != nil {
return err
}
// Check if blockhashReceived == the header hash from last get headers this node performed
// if not then increment and request next block
if s.headerHash != block.Hash {
nextHash, err := s.cfg.GetNextBlockHash()
if err != nil {
return err
}
return s.cfg.RequestBlock(nextHash, block.Index)
}
// If we are caught up then go into normal mode
diff := peer.Height() - block.Index
if diff <= cruiseHeight {
s.syncmode = normalMode
s.timer.Reset(blockTimer)
return nil
}
// If not then we go back into headersMode and request more headers.
s.syncmode = headersMode
return s.cfg.RequestHeaders(block.Hash)
}
func (s *Syncmgr) blockModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
// We ignore headers when in this mode
return nil
}

View file

@ -1,57 +0,0 @@
package syncmgr
import (
"sort"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
func (s *Syncmgr) addToBlockPool(newBlock payload.Block) {
s.poolLock.Lock()
defer s.poolLock.Unlock()
for _, block := range s.blockPool {
if block.Index == newBlock.Index {
return
}
}
s.blockPool = append(s.blockPool, newBlock)
// sort slice using block index
sort.Slice(s.blockPool, func(i, j int) bool {
return s.blockPool[i].Index < s.blockPool[j].Index
})
}
func (s *Syncmgr) checkPool() error {
// Assuming that the blocks are sorted in order
var indexesToRemove = -1
s.poolLock.Lock()
defer func() {
// removes all elements before this index, including the element at this index
s.blockPool = s.blockPool[indexesToRemove+1:]
s.poolLock.Unlock()
}()
// loop iterates through the cache, processing any
// blocks that can be added to the chain
for i, block := range s.blockPool {
if s.nextBlockIndex != block.Index {
break
}
// Save this block and save the indice location so we can remove it, when we defer
err := s.processBlock(block)
if err != nil {
return err
}
indexesToRemove = i
}
return nil
}

View file

@ -1,42 +0,0 @@
package syncmgr
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestAddBlockPoolFlush(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode, 10)
blockMessage := randomBlockMessage(t, 11)
peer := &mockPeer{
height: 100,
}
// Since the block has Index 11 and the sync manager needs the block with index 10
// This block will be added to the blockPool
err := syncmgr.OnBlock(peer, blockMessage)
assert.Nil(t, err)
assert.Equal(t, 1, len(syncmgr.blockPool))
// The sync manager is still looking for the block at height 10
// Since this block is at height 12, it will be added to the block pool
blockMessage = randomBlockMessage(t, 12)
err = syncmgr.OnBlock(peer, blockMessage)
assert.Nil(t, err)
assert.Equal(t, 2, len(syncmgr.blockPool))
// This is the block that the sync manager was waiting for
// It should process this block, the check the pool for the next set of blocks
blockMessage = randomBlockMessage(t, 10)
err = syncmgr.OnBlock(peer, blockMessage)
assert.Nil(t, err)
assert.Equal(t, 0, len(syncmgr.blockPool))
// Since we processed 3 blocks and the sync manager started
//looking for block with index 10. The syncmananger should be looking for
// the block with index 13
assert.Equal(t, uint32(13), syncmgr.nextBlockIndex)
}

View file

@ -1,44 +0,0 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
// Config is the configuration file for the sync manager
type Config struct {
// Chain functions
ProcessBlock func(block payload.Block) error
ProcessHeaders func(hdrs []*payload.BlockBase) error
// RequestHeaders will send a getHeaders request
// with the hash passed in as a parameter
RequestHeaders func(hash util.Uint256) error
//RequestBlock will send a getdata request for the block
// with the hash passed as a parameter
RequestBlock func(hash util.Uint256, index uint32) error
// GetNextBlockHash returns the block hash of the header infront of thr block
// at the tip of this nodes chain. This assumes that the node is not in sync
GetNextBlockHash func() (util.Uint256, error)
// AskForNewBlocks will send out a message to the network
// asking for new blocks
AskForNewBlocks func()
// FetchHeadersAgain is called when a peer has provided headers that have not
// validated properly. We pass in the hash of the first header
FetchHeadersAgain func(util.Uint256) error
// FetchHeadersAgain is called when a peer has provided a block that has not
// validated properly. We pass in the hash of the block
FetchBlockAgain func(util.Uint256) error
}
// SyncPeer represents a peer on the network
// that this node can sync with
type SyncPeer interface {
Height() uint32
}

View file

@ -1,42 +0,0 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
// headersModeOnHeaders is called when the sync manager is headers mode
// and receives a header.
func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
// If we are in Headers mode, then we just need to process the headers
// Note: For the un-optimised version, we move straight to blocksOnly mode
firstHash := hdrs[0].Hash
firstHdrIndex := hdrs[0].Index
err := s.cfg.ProcessHeaders(hdrs)
if err == nil {
// Update syncmgr last header
s.headerHash = hdrs[len(hdrs)-1].Hash
s.syncmode = blockMode
return s.cfg.RequestBlock(firstHash, firstHdrIndex)
}
// Check whether it is a validation error, or a database error
if _, ok := err.(*chain.ValidationError); ok {
// If we get a validation error we re-request the headers
// the method will automatically fetch from a different peer
// XXX: Add increment banScore for this peer
return s.cfg.FetchHeadersAgain(firstHash)
}
// This means it is a database error. We have no way to recover from this.
panic(err.Error())
}
// headersModeOnBlock is called when the sync manager is headers mode
// and receives a block.
func (s *Syncmgr) headersModeOnBlock(peer SyncPeer, block payload.Block) error {
// While in headers mode, ignore any blocks received
return nil
}

View file

@ -1,113 +0,0 @@
package syncmgr
import (
"crypto/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
type syncTestHelper struct {
blocksProcessed int
headersProcessed int
newBlockRequest int
headersFetchRequest int
blockFetchRequest int
err error
}
func (s *syncTestHelper) ProcessBlock(msg payload.Block) error {
s.blocksProcessed++
return s.err
}
func (s *syncTestHelper) ProcessHeaders(hdrs []*payload.BlockBase) error {
s.headersProcessed = s.headersProcessed + len(hdrs)
return s.err
}
func (s *syncTestHelper) GetNextBlockHash() (util.Uint256, error) {
return util.Uint256{}, s.err
}
func (s *syncTestHelper) AskForNewBlocks() {
s.newBlockRequest++
}
func (s *syncTestHelper) FetchHeadersAgain(util.Uint256) error {
s.headersFetchRequest++
return s.err
}
func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error {
s.blockFetchRequest++
return s.err
}
func (s *syncTestHelper) RequestBlock(util.Uint256, uint32) error {
s.blockFetchRequest++
return s.err
}
func (s *syncTestHelper) RequestHeaders(util.Uint256) error {
s.headersFetchRequest++
return s.err
}
type mockPeer struct {
height uint32
}
func (p *mockPeer) Height() uint32 { return p.height }
func randomHeadersMessage(t *testing.T, num int) *payload.HeadersMessage {
var hdrs []*payload.BlockBase
for i := 0; i < num; i++ {
hash := randomUint256(t)
hdr := &payload.BlockBase{Hash: hash}
hdrs = append(hdrs, hdr)
}
hdrsMsg, err := payload.NewHeadersMessage()
assert.Nil(t, err)
hdrsMsg.Headers = hdrs
return hdrsMsg
}
func randomUint256(t *testing.T) util.Uint256 {
hash := make([]byte, 32)
_, err := rand.Read(hash)
assert.Nil(t, err)
u, err := util.Uint256DecodeBytes(hash)
assert.Nil(t, err)
return u
}
func setupSyncMgr(mode mode, nextBlockIndex uint32) (*Syncmgr, *syncTestHelper) {
helper := &syncTestHelper{}
cfg := &Config{
ProcessBlock: helper.ProcessBlock,
ProcessHeaders: helper.ProcessHeaders,
GetNextBlockHash: helper.GetNextBlockHash,
AskForNewBlocks: helper.AskForNewBlocks,
FetchHeadersAgain: helper.FetchHeadersAgain,
FetchBlockAgain: helper.FetchBlockAgain,
RequestBlock: helper.RequestBlock,
RequestHeaders: helper.RequestHeaders,
}
syncmgr := New(cfg, nextBlockIndex)
syncmgr.syncmode = mode
return syncmgr, helper
}

View file

@ -1,60 +0,0 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
// If in normal mode, first process the headers
err := s.cfg.ProcessHeaders(hdrs)
if err != nil {
// If something went wrong with processing the headers
// Ask another peer for the headers.
//XXX: Increment banscore for this peer
return s.cfg.FetchHeadersAgain(hdrs[0].Hash)
}
lenHeaders := len(hdrs)
firstHash := hdrs[0].Hash
firstHdrIndex := hdrs[0].Index
lastHash := hdrs[lenHeaders-1].Hash
// Update syncmgr latest header
s.headerHash = lastHash
// If there are 2k headers, then ask for more headers and switch back to headers mode.
if lenHeaders == 2000 {
s.syncmode = headersMode
return s.cfg.RequestHeaders(lastHash)
}
// Ask for the corresponding block iff there is < 2k headers
// then switch to blocksMode
// Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000
// This means that we have less than 2k headers
s.syncmode = blockMode
return s.cfg.RequestBlock(firstHash, firstHdrIndex)
}
// normalModeOnBlock is called when the sync manager is normal mode
// and receives a block.
func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error {
// stop the timer that periodically asks for blocks
s.timer.Stop()
// process block
err := s.processBlock(block)
if err != nil {
s.timer.Reset(blockTimer)
return s.cfg.FetchBlockAgain(block.Hash)
}
diff := peer.Height() - block.Index
if diff > trailingHeight {
s.syncmode = headersMode
return s.cfg.RequestHeaders(block.Hash)
}
s.timer.Reset(blockTimer)
return nil
}

View file

@ -1,152 +0,0 @@
package syncmgr
import (
"fmt"
"sync"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
type mode uint8
// Note: this is the unoptimised version without parallel sync
// The algorithm for the unoptimsied version is simple:
// Download 2000 headers, then download the blocks for those headers
// Once those blocks are downloaded, we repeat the process again
// Until we are nomore than one block behind the tip.
// Once this happens, we switch into normal mode.
//In normal mode, we have a timer on for X seconds and ask nodes for blocks and also to doublecheck
// if we are behind once the timer runs out.
// The timer restarts whenever we receive a block.
// The parameter X should be approximately the time it takes the network to reach consensus
//blockTimer approximates to how long it takes to reach consensus and propagate
// a block in the network. Once a node has synchronised with the network, he will
// ask the network for a newblock every blockTimer
const blockTimer = 20 * time.Second
// trailingHeight indicates how many blocks the node has to be behind by
// before he switches to headersMode.
const trailingHeight = 100
// indicates how many blocks the node has to be behind by
// before he switches to normalMode and fetches blocks every X seconds.
const cruiseHeight = 0
const (
headersMode mode = 1
blockMode mode = 2
normalMode mode = 3
)
//Syncmgr keeps the node in sync with the rest of the network
type Syncmgr struct {
syncmode mode
cfg *Config
timer *time.Timer
// headerHash is the hash of the last header in the last OnHeaders message that we received.
// When receiving blocks, we can use this to determine whether the node has downloaded
// all of the blocks for the last headers messages
headerHash util.Uint256
poolLock sync.Mutex
blockPool []payload.Block
nextBlockIndex uint32
}
// New creates a new sync manager
func New(cfg *Config, nextBlockIndex uint32) *Syncmgr {
newBlockTimer := time.AfterFunc(blockTimer, func() {
cfg.AskForNewBlocks()
})
newBlockTimer.Stop()
return &Syncmgr{
syncmode: headersMode,
cfg: cfg,
timer: newBlockTimer,
nextBlockIndex: nextBlockIndex,
}
}
// OnHeader is called when the node receives a headers message
func (s *Syncmgr) OnHeader(peer SyncPeer, msg *payload.HeadersMessage) error {
// XXX(Optimisation): First check if we actually need these headers
// Check the last header in msg and then check what our latest header that was saved is
// If our latest header is above the lastHeader, then we do not save it
// We could also have that our latest header is above only some of the headers.
// In this case, we should remove the headers that we already have
if len(msg.Headers) == 0 {
// XXX: Increment banScore for this peer, for sending empty headers message
return nil
}
var err error
switch s.syncmode {
case headersMode:
err = s.headersModeOnHeaders(peer, msg.Headers)
case blockMode:
err = s.blockModeOnHeaders(peer, msg.Headers)
case normalMode:
err = s.normalModeOnHeaders(peer, msg.Headers)
default:
err = s.headersModeOnHeaders(peer, msg.Headers)
}
// XXX(Kev):The only meaningful error here would be if the peer
// we re-requested blocks from failed. In the next iteration, this will be handled
// by the peer manager, who will only return an error, if we are connected to no peers.
// Upon re-alising this, the node will then send out GetAddresses to the network and
// syncing will be resumed, once we find peers to connect to.
hdr := msg.Headers[len(msg.Headers)-1]
fmt.Printf("Finished processing headers. LastHash in set was: %s\n ", hdr.Hash.ReverseString())
return err
}
// OnBlock is called when the node receives a block
func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error {
fmt.Printf("Block received with height %d\n", msg.Block.Index)
var err error
switch s.syncmode {
case headersMode:
err = s.headersModeOnBlock(peer, msg.Block)
case blockMode:
err = s.blockModeOnBlock(peer, msg.Block)
case normalMode:
err = s.normalModeOnBlock(peer, msg.Block)
default:
err = s.headersModeOnBlock(peer, msg.Block)
}
fmt.Printf("Processed Block with height %d\n", msg.Block.Index)
return err
}
//IsCurrent returns true if the node is currently
// synced up with the network
func (s *Syncmgr) IsCurrent() bool {
return s.syncmode == normalMode
}
func (s *Syncmgr) processBlock(block payload.Block) error {
err := s.cfg.ProcessBlock(block)
if err != nil {
return err
}
s.nextBlockIndex++
return nil
}

View file

@ -1,97 +0,0 @@
package syncmgr
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/stretchr/testify/assert"
)
func TestHeadersModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode, 0)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
// In headerMode, we do nothing
assert.Equal(t, 0, helper.blocksProcessed)
}
func TestBlockModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode, 0)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
// When a block is received in blockMode, it is processed
assert.Equal(t, 1, helper.blocksProcessed)
}
func TestNormalModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode, 0)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
// When a block is received in normal, it is processed
assert.Equal(t, 1, helper.blocksProcessed)
}
func TestBlockModeToNormalMode(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode, 100)
peer := &mockPeer{
height: 100,
}
blkMessage := randomBlockMessage(t, 100)
syncmgr.OnBlock(peer, blkMessage)
// We should switch to normal mode, since the block
//we received is close to the height of the peer. See cruiseHeight
assert.Equal(t, normalMode, syncmgr.syncmode)
}
func TestBlockModeStayInBlockMode(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode, 0)
// We need our latest know hash to not be equal to the hash
// of the block we received, to stay in blockmode
syncmgr.headerHash = randomUint256(t)
peer := &mockPeer{
height: 2000,
}
blkMessage := randomBlockMessage(t, 100)
syncmgr.OnBlock(peer, blkMessage)
// We should stay in block mode, since the block we received is
// still quite far behind the peers height
assert.Equal(t, blockMode, syncmgr.syncmode)
}
func TestBlockModeAlreadyExistsErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode, 100)
helper.err = chain.ErrBlockAlreadyExists
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100))
assert.Equal(t, 0, helper.blockFetchRequest)
// If we have a block already exists in blockmode, then we
// switch back to headers mode.
assert.Equal(t, headersMode, syncmgr.syncmode)
}
func randomBlockMessage(t *testing.T, height uint32) *payload.BlockMessage {
blockMessage, err := payload.NewBlockMessage()
blockMessage.BlockBase.Index = height
assert.Nil(t, err)
return blockMessage
}

View file

@ -1,117 +0,0 @@
package syncmgr
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/stretchr/testify/assert"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
func TestHeadersModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode, 0)
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0))
// Since there were no headers, we should have exited early and processed nothing
assert.Equal(t, 0, helper.headersProcessed)
// ProcessHeaders should have been called once to process all 100 headers
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
assert.Equal(t, 100, helper.headersProcessed)
// Mode should now be blockMode
assert.Equal(t, blockMode, syncmgr.syncmode)
}
func TestBlockModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode, 0)
// If we receive a header in blockmode, no headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
assert.Equal(t, 0, helper.headersProcessed)
}
func TestNormalModeOnHeadersMaxHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode, 0)
// If we receive a header in normalmode, headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000))
assert.Equal(t, 2000, helper.headersProcessed)
// Mode should now be headersMode since we received 2000 headers
assert.Equal(t, headersMode, syncmgr.syncmode)
}
// This differs from the previous function in that
//we did not receive the max amount of headers
func TestNormalModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode, 0)
// If we receive a header in normalmode, headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
assert.Equal(t, 200, helper.headersProcessed)
// Because we did not receive 2000 headers, we switch to blockMode
assert.Equal(t, blockMode, syncmgr.syncmode)
}
func TestLastHeaderUpdates(t *testing.T) {
syncmgr, _ := setupSyncMgr(headersMode, 0)
hdrsMessage := randomHeadersMessage(t, 200)
hdrs := hdrsMessage.Headers
lastHeader := hdrs[len(hdrs)-1]
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
// Headers are processed in headersMode
// Last header should be updated
assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash))
// Change mode to blockMode and reset lastHeader
syncmgr.syncmode = blockMode
syncmgr.headerHash = util.Uint256{}
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
// header should not be changed
assert.False(t, syncmgr.headerHash.Equals(lastHeader.Hash))
// Change mode to normalMode and reset lastHeader
syncmgr.syncmode = normalMode
syncmgr.headerHash = util.Uint256{}
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
// headers are processed in normalMode
// hash should be updated
assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash))
}
func TestHeadersModeOnHeadersErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode, 0)
helper.err = &chain.ValidationError{}
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
// On a validation error, we should request for another peer
// to send us these headers
assert.Equal(t, 1, helper.headersFetchRequest)
}
func TestNormalModeOnHeadersErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode, 0)
helper.err = &chain.ValidationError{}
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
// On a validation error, we should request for another peer
// to send us these headers
assert.Equal(t, 1, helper.headersFetchRequest)
}