network: implement Consensus payloads

This commit is contained in:
Evgenii Stratonikov 2019-11-08 18:40:21 +03:00
parent ad9091d13d
commit 085ca7b770
13 changed files with 806 additions and 3 deletions

60
pkg/consensus/cache.go Normal file
View file

@ -0,0 +1,60 @@
package consensus
import (
"container/list"
"sync"
"github.com/CityOfZion/neo-go/pkg/util"
)
// relayCache is a payload cache which is used to store
// last consensus payloads.
type relayCache struct {
*sync.RWMutex
maxCap int
elems map[util.Uint256]*list.Element
queue *list.List
}
func newFIFOCache(capacity int) *relayCache {
return &relayCache{
RWMutex: new(sync.RWMutex),
maxCap: capacity,
elems: make(map[util.Uint256]*list.Element),
queue: list.New(),
}
}
// Add adds payload into a cache if it doesn't already exist.
func (c *relayCache) Add(p *Payload) {
c.Lock()
defer c.Unlock()
h := p.Hash()
if c.elems[h] != nil {
return
}
if c.queue.Len() >= c.maxCap {
first := c.queue.Front()
c.queue.Remove(first)
delete(c.elems, first.Value.(*Payload).Hash())
}
e := c.queue.PushBack(p)
c.elems[h] = e
}
// Get returns payload with the specified hash from cache.
func (c *relayCache) Get(h util.Uint256) *Payload {
c.RLock()
defer c.RUnlock()
e, ok := c.elems[h]
if !ok {
return nil
}
return e.Value.(*Payload)
}

View file

@ -0,0 +1,60 @@
package consensus
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestRelayCache_Add(t *testing.T) {
const capacity = 3
payloads := getDifferentPayloads(t, capacity+1)
c := newFIFOCache(capacity)
require.Equal(t, 0, c.queue.Len())
require.Equal(t, 0, len(c.elems))
for i := 1; i < capacity; i++ {
c.Add(&payloads[i])
require.Equal(t, i, c.queue.Len())
require.Equal(t, i, len(c.elems))
}
// add already existing payload
c.Add(&payloads[1])
require.Equal(t, capacity-1, c.queue.Len())
require.Equal(t, capacity-1, len(c.elems))
c.Add(&payloads[0])
require.Equal(t, capacity, c.queue.Len())
require.Equal(t, capacity, len(c.elems))
// capacity does not exceed maximum
c.Add(&payloads[capacity])
require.Equal(t, capacity, c.queue.Len())
require.Equal(t, capacity, len(c.elems))
// recent payloads are still present in cache
require.Equal(t, &payloads[0], c.Get(payloads[0].Hash()))
for i := 2; i <= capacity; i++ {
require.Equal(t, &payloads[i], c.Get(payloads[i].Hash()))
}
// oldest payload was removed
require.Equal(t, (*Payload)(nil), c.Get(payloads[1].Hash()))
}
func getDifferentPayloads(t *testing.T, n int) (payloads []Payload) {
payloads = make([]Payload, n)
for i := range payloads {
var sign [signatureSize]byte
fillRandom(t, sign[:])
payloads[i].ValidatorIndex = uint16(i)
payloads[i].Type = commitType
payloads[i].payload = &commit{
Signature: sign,
}
}
return
}

View file

@ -0,0 +1,19 @@
package consensus
import "github.com/CityOfZion/neo-go/pkg/io"
// changeView represents dBFT ChangeView message.
type changeView struct {
NewViewNumber byte
Timestamp uint32
}
// EncodeBinary implements io.Serializable interface.
func (c *changeView) EncodeBinary(w *io.BinWriter) {
w.WriteLE(c.Timestamp)
}
// DecodeBinary implements io.Serializable interface.
func (c *changeView) DecodeBinary(r *io.BinReader) {
r.ReadLE(&c.Timestamp)
}

22
pkg/consensus/commit.go Normal file
View file

@ -0,0 +1,22 @@
package consensus
import "github.com/CityOfZion/neo-go/pkg/io"
// commit represents dBFT Commit message.
type commit struct {
Signature [signatureSize]byte
}
// signatureSize is an rfc6989 signature size in bytes
// without leading byte (0x04, uncompressed)
const signatureSize = 64
// EncodeBinary implements io.Serializable interface.
func (c *commit) EncodeBinary(w *io.BinWriter) {
w.WriteBE(c.Signature)
}
// DecodeBinary implements io.Serializable interface.
func (c *commit) DecodeBinary(r *io.BinReader) {
r.ReadBE(&c.Signature)
}

View file

@ -0,0 +1,34 @@
package consensus
import "github.com/CityOfZion/neo-go/pkg/util"
// cacheMaxCapacity is the default cache capacity taken
// from C# implementation https://github.com/neo-project/neo/blob/master/neo/Ledger/Blockchain.cs#L64
const cacheMaxCapacity = 100
// Service represents consensus instance.
type Service interface {
OnPayload(p *Payload)
GetPayload(h util.Uint256) *Payload
}
type service struct {
cache *relayCache
}
// NewService returns new consensus.Service instance.
func NewService() Service {
return &service{
cache: newFIFOCache(cacheMaxCapacity),
}
}
// OnPayload handles Payload receive.
func (s *service) OnPayload(p *Payload) {
s.cache.Add(p)
}
// GetPayload returns payload stored in cache.
func (s *service) GetPayload(h util.Uint256) *Payload {
return s.cache.Get(h)
}

155
pkg/consensus/payload.go Normal file
View file

@ -0,0 +1,155 @@
package consensus
import (
"fmt"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/hash"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/pkg/errors"
)
type (
messageType byte
message struct {
Type messageType
ViewNumber byte
payload io.Serializable
}
// Payload is a type for consensus-related messages.
Payload struct {
message
Version uint32
ValidatorIndex uint16
PrevHash util.Uint256
Height uint32
Timestamp uint32
Witness transaction.Witness
}
)
const (
changeViewType messageType = 0x00
prepareRequestType messageType = 0x20
prepareResponseType messageType = 0x21
commitType messageType = 0x30
recoveryRequestType messageType = 0x40
recoveryMessageType messageType = 0x41
)
// EncodeBinaryUnsigned writes payload to w excluding signature.
func (p *Payload) EncodeBinaryUnsigned(w *io.BinWriter) {
w.WriteLE(p.Version)
w.WriteBE(p.PrevHash[:])
w.WriteLE(p.Height)
w.WriteLE(p.ValidatorIndex)
w.WriteLE(p.Timestamp)
ww := io.NewBufBinWriter()
p.message.EncodeBinary(ww.BinWriter)
w.WriteBytes(ww.Bytes())
}
// EncodeBinary implements io.Serializable interface.
func (p *Payload) EncodeBinary(w *io.BinWriter) {
p.EncodeBinaryUnsigned(w)
w.WriteLE(byte(1))
p.Witness.EncodeBinary(w)
}
// DecodeBinaryUnsigned reads payload from w excluding signature.
func (p *Payload) DecodeBinaryUnsigned(r *io.BinReader) {
r.ReadLE(&p.Version)
r.ReadBE(p.PrevHash[:])
r.ReadLE(&p.Height)
r.ReadLE(&p.ValidatorIndex)
r.ReadLE(&p.Timestamp)
data := r.ReadBytes()
rr := io.NewBinReaderFromBuf(data)
p.message.DecodeBinary(rr)
}
// Hash returns 32-byte message hash.
func (p *Payload) Hash() util.Uint256 {
w := io.NewBufBinWriter()
p.EncodeBinaryUnsigned(w.BinWriter)
return hash.DoubleSha256(w.Bytes())
}
// DecodeBinary implements io.Serializable interface.
func (p *Payload) DecodeBinary(r *io.BinReader) {
p.DecodeBinaryUnsigned(r)
var b byte
r.ReadLE(&b)
if b != 1 {
r.Err = errors.New("invalid format")
return
}
p.Witness.DecodeBinary(r)
}
// EncodeBinary implements io.Serializable interface.
func (m *message) EncodeBinary(w *io.BinWriter) {
w.WriteLE(byte(m.Type))
w.WriteLE(m.ViewNumber)
m.payload.EncodeBinary(w)
}
// DecodeBinary implements io.Serializable interface.
func (m *message) DecodeBinary(r *io.BinReader) {
r.ReadLE((*byte)(&m.Type))
r.ReadLE(&m.ViewNumber)
switch m.Type {
case changeViewType:
cv := new(changeView)
// NewViewNumber is not marshaled
cv.NewViewNumber = m.ViewNumber + 1
m.payload = cv
case prepareRequestType:
m.payload = new(prepareRequest)
case prepareResponseType:
m.payload = new(prepareResponse)
case commitType:
m.payload = new(commit)
case recoveryRequestType:
m.payload = new(recoveryRequest)
case recoveryMessageType:
m.payload = new(recoveryMessage)
default:
r.Err = errors.Errorf("invalid type: 0x%02x", byte(m.Type))
return
}
m.payload.DecodeBinary(r)
}
// String implements fmt.Stringer interface.
func (t messageType) String() string {
switch t {
case changeViewType:
return "ChangeView"
case prepareRequestType:
return "PrepareRequest"
case prepareResponseType:
return "PrepareResponse"
case commitType:
return "Commit"
case recoveryRequestType:
return "RecoveryRequest"
case recoveryMessageType:
return "RecoveryMessage"
default:
return fmt.Sprintf("UNKNOWN(0x%02x)", byte(t))
}
}

View file

@ -0,0 +1,220 @@
package consensus
import (
"encoding/hex"
gio "io"
"math/rand"
"testing"
"time"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
var messageTypes = []messageType{
changeViewType,
prepareRequestType,
prepareResponseType,
commitType,
recoveryRequestType,
recoveryMessageType,
}
func TestConsensusPayload_Hash(t *testing.T) {
dataHex := "00000000d8fb8d3b143b5f98468ef701909c976505a110a01e26c5e99be9a90cff979199b6fc33000400000000008d2000184dc95de24018f9ad71f4448a2b438eaca8b4b2ab6b4524b5a69a45d920c35103f3901444320656c390ff39c0062f5e8e138ce446a40c7e4ba1af1f8247ebbdf49295933715d3a67949714ff924f8a28cec5b954c71eca3bfaf0e9d4b1f87b4e21e9ba4ae18f97de71501b5c5d07edc200bd66a46b9b28b1a371f2195c10b0af90000e24018f900000000014140c9faaee59942f58da0e5268bc199632f2a3ad0fcbee68681a4437f140b49512e8d9efc6880eb44d3490782895a5794f35eeccee2923ce0c76fa7a1890f934eac232103c089d7122b840a4935234e82e26ae5efd0c2acb627239dc9f207311337b6f2c1ac"
data, err := hex.DecodeString(dataHex)
require.NoError(t, err)
r := io.NewBinReaderFromBuf(data)
var p Payload
p.DecodeBinary(r)
require.NoError(t, err)
require.Equal(t, p.Hash().String(), "45859759c8491597804f1922773947e0d37bf54484af82f80cd642f7b063aa56")
}
func TestConsensusPayload_Serializable(t *testing.T) {
for _, mt := range messageTypes {
p := randomPayload(t, mt)
testSerializable(t, p, new(Payload))
}
}
func TestCommit_Serializable(t *testing.T) {
c := randomMessage(t, commitType)
testSerializable(t, c, new(commit))
}
func TestPrepareResponse_Serializable(t *testing.T) {
resp := randomMessage(t, prepareResponseType)
testSerializable(t, resp, new(prepareResponse))
}
func TestPrepareRequest_Serializable(t *testing.T) {
req := randomMessage(t, prepareRequestType)
testSerializable(t, req, new(prepareRequest))
}
func TestRecoveryRequest_Serializable(t *testing.T) {
req := randomMessage(t, recoveryRequestType)
testSerializable(t, req, new(recoveryRequest))
}
func TestRecoveryMessage_Serializable(t *testing.T) {
msg := randomMessage(t, recoveryMessageType)
testSerializable(t, msg, new(recoveryMessage))
}
func randomPayload(t *testing.T, mt messageType) *Payload {
p := &Payload{
message: message{
Type: mt,
ViewNumber: byte(rand.Uint32()),
payload: randomMessage(t, mt),
},
Version: 1,
ValidatorIndex: 13,
Height: rand.Uint32(),
Timestamp: rand.Uint32(),
Witness: transaction.Witness{
InvocationScript: fillRandom(t, make([]byte, 3)),
VerificationScript: fillRandom(t, make([]byte, 4)),
},
}
fillRandom(t, p.PrevHash[:])
if mt == changeViewType {
p.payload.(*changeView).NewViewNumber = p.ViewNumber + 1
}
return p
}
func randomMessage(t *testing.T, mt messageType) io.Serializable {
switch mt {
case changeViewType:
return &changeView{
Timestamp: rand.Uint32(),
}
case prepareRequestType:
return randomPrepareRequest(t)
case prepareResponseType:
resp := &prepareResponse{}
fillRandom(t, resp.PreparationHash[:])
return resp
case commitType:
var c commit
fillRandom(t, c.Signature[:])
return &c
case recoveryRequestType:
return &recoveryRequest{Timestamp: rand.Uint32()}
case recoveryMessageType:
return randomRecoveryMessage(t)
default:
require.Fail(t, "invalid type")
return nil
}
}
func randomPrepareRequest(t *testing.T) *prepareRequest {
const txCount = 3
req := &prepareRequest{
Timestamp: rand.Uint32(),
Nonce: rand.Uint64(),
TransactionHashes: make([]util.Uint256, txCount),
MinerTransaction: *newMinerTx(rand.Uint32()),
}
req.TransactionHashes[0] = req.MinerTransaction.Hash()
for i := 1; i < txCount; i++ {
fillRandom(t, req.TransactionHashes[i][:])
}
fillRandom(t, req.NextConsensus[:])
return req
}
func randomRecoveryMessage(t *testing.T) *recoveryMessage {
result := randomMessage(t, prepareRequestType)
require.IsType(t, (*prepareRequest)(nil), result)
prepReq := result.(*prepareRequest)
return &recoveryMessage{
PreparationPayloads: []*preparationCompact{
{
ValidatorIndex: 1,
InvocationScript: fillRandom(t, make([]byte, 10)),
},
},
CommitPayloads: []*commitCompact{
{
ViewNumber: 0,
ValidatorIndex: 1,
Signature: fillRandom(t, make([]byte, signatureSize)),
InvocationScript: fillRandom(t, make([]byte, 20)),
},
{
ViewNumber: 0,
ValidatorIndex: 2,
Signature: fillRandom(t, make([]byte, signatureSize)),
InvocationScript: fillRandom(t, make([]byte, 10)),
},
},
ChangeViewPayloads: []*changeViewCompact{
{
Timestamp: rand.Uint32(),
ValidatorIndex: 3,
OriginalViewNumber: 3,
InvocationScript: fillRandom(t, make([]byte, 4)),
},
},
PrepareRequest: prepReq,
}
}
func TestMessageType_String(t *testing.T) {
require.Equal(t, "ChangeView", changeViewType.String())
require.Equal(t, "PrepareRequest", prepareRequestType.String())
require.Equal(t, "PrepareResponse", prepareResponseType.String())
require.Equal(t, "Commit", commitType.String())
require.Equal(t, "RecoveryMessage", recoveryMessageType.String())
require.Equal(t, "RecoveryRequest", recoveryRequestType.String())
require.Equal(t, "UNKNOWN(0xff)", messageType(0xff).String())
}
func testSerializable(t *testing.T, expected, actual io.Serializable) {
w := io.NewBufBinWriter()
expected.EncodeBinary(w.BinWriter)
r := io.NewBinReaderFromBuf(w.Bytes())
actual.DecodeBinary(r)
require.Equal(t, expected, actual)
}
func fillRandom(t *testing.T, buf []byte) []byte {
r := rand.New(rand.NewSource(time.Now().Unix()))
_, err := gio.ReadFull(r, buf)
require.NoError(t, err)
return buf
}
func newMinerTx(nonce uint32) *transaction.Transaction {
return &transaction.Transaction{
Type: transaction.MinerType,
Version: 0,
Data: &transaction.MinerTX{
Nonce: rand.Uint32(),
},
Attributes: []*transaction.Attribute{},
Inputs: []*transaction.Input{},
Outputs: []*transaction.Output{},
Scripts: []*transaction.Witness{},
Trimmed: false,
}
}

View file

@ -0,0 +1,43 @@
package consensus
import (
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/util"
)
// prepareRequest represents dBFT PrepareRequest message.
type prepareRequest struct {
Timestamp uint32
Nonce uint64
TransactionHashes []util.Uint256
MinerTransaction transaction.Transaction
NextConsensus util.Uint160
}
// EncodeBinary implements io.Serializable interface.
func (p *prepareRequest) EncodeBinary(w *io.BinWriter) {
w.WriteLE(p.Timestamp)
w.WriteLE(p.Nonce)
w.WriteBE(p.NextConsensus[:])
w.WriteVarUint(uint64(len(p.TransactionHashes)))
for i := range p.TransactionHashes {
w.WriteBE(p.TransactionHashes[i][:])
}
p.MinerTransaction.EncodeBinary(w)
}
// DecodeBinary implements io.Serializable interface.
func (p *prepareRequest) DecodeBinary(r *io.BinReader) {
r.ReadLE(&p.Timestamp)
r.ReadLE(&p.Nonce)
r.ReadBE(p.NextConsensus[:])
lenHashes := r.ReadVarUint()
p.TransactionHashes = make([]util.Uint256, lenHashes)
for i := range p.TransactionHashes {
r.ReadBE(p.TransactionHashes[i][:])
}
p.MinerTransaction.DecodeBinary(r)
}

View file

@ -0,0 +1,21 @@
package consensus
import (
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/util"
)
// prepareResponse represents dBFT PrepareResponse message.
type prepareResponse struct {
PreparationHash util.Uint256
}
// EncodeBinary implements io.Serializable interface.
func (p *prepareResponse) EncodeBinary(w *io.BinWriter) {
w.WriteBE(p.PreparationHash[:])
}
// DecodeBinary implements io.Serializable interface.
func (p *prepareResponse) DecodeBinary(r *io.BinReader) {
r.ReadBE(p.PreparationHash[:])
}

View file

@ -0,0 +1,132 @@
package consensus
import (
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/pkg/errors"
)
type (
// recoveryMessage represents dBFT Recovery message.
recoveryMessage struct {
PreparationHash *util.Uint256
PreparationPayloads []*preparationCompact
CommitPayloads []*commitCompact
ChangeViewPayloads []*changeViewCompact
PrepareRequest *prepareRequest
}
changeViewCompact struct {
ValidatorIndex uint16
OriginalViewNumber byte
Timestamp uint32
InvocationScript []byte
}
commitCompact struct {
ViewNumber byte
ValidatorIndex uint16
Signature []byte
InvocationScript []byte
}
preparationCompact struct {
ValidatorIndex uint16
InvocationScript []byte
}
)
const uint256size = 32
// DecodeBinary implements io.Serializable interface.
func (m *recoveryMessage) DecodeBinary(r *io.BinReader) {
m.ChangeViewPayloads = r.ReadArray(changeViewCompact{}).([]*changeViewCompact)
var hasReq bool
r.ReadLE(&hasReq)
if hasReq {
m.PrepareRequest = new(prepareRequest)
m.PrepareRequest.DecodeBinary(r)
} else {
l := r.ReadVarUint()
if l != 0 {
if l == uint256size {
m.PreparationHash = new(util.Uint256)
r.ReadBE(m.PreparationHash[:])
} else {
r.Err = errors.New("invalid data")
}
} else {
m.PreparationHash = nil
}
}
m.PreparationPayloads = r.ReadArray(preparationCompact{}).([]*preparationCompact)
m.CommitPayloads = r.ReadArray(commitCompact{}).([]*commitCompact)
}
// EncodeBinary implements io.Serializable interface.
func (m *recoveryMessage) EncodeBinary(w *io.BinWriter) {
w.WriteArray(m.ChangeViewPayloads)
hasReq := m.PrepareRequest != nil
w.WriteLE(hasReq)
if hasReq {
m.PrepareRequest.EncodeBinary(w)
} else {
if m.PreparationHash == nil {
w.WriteVarUint(0)
} else {
w.WriteVarUint(uint256size)
w.WriteBE(m.PreparationHash[:])
}
}
w.WriteArray(m.PreparationPayloads)
w.WriteArray(m.CommitPayloads)
}
// DecodeBinary implements io.Serializable interface.
func (p *changeViewCompact) DecodeBinary(r *io.BinReader) {
r.ReadLE(&p.ValidatorIndex)
r.ReadLE(&p.OriginalViewNumber)
r.ReadLE(&p.Timestamp)
p.InvocationScript = r.ReadBytes()
}
// EncodeBinary implements io.Serializable interface.
func (p *changeViewCompact) EncodeBinary(w *io.BinWriter) {
w.WriteLE(p.ValidatorIndex)
w.WriteLE(p.OriginalViewNumber)
w.WriteLE(p.Timestamp)
w.WriteBytes(p.InvocationScript)
}
// DecodeBinary implements io.Serializable interface.
func (p *commitCompact) DecodeBinary(r *io.BinReader) {
r.ReadLE(&p.ViewNumber)
r.ReadLE(&p.ValidatorIndex)
p.Signature = make([]byte, signatureSize)
r.ReadBE(&p.Signature)
p.InvocationScript = r.ReadBytes()
}
// EncodeBinary implements io.Serializable interface.
func (p *commitCompact) EncodeBinary(w *io.BinWriter) {
w.WriteLE(p.ViewNumber)
w.WriteLE(p.ValidatorIndex)
w.WriteBE(p.Signature)
w.WriteBytes(p.InvocationScript)
}
// DecodeBinary implements io.Serializable interface.
func (p *preparationCompact) DecodeBinary(r *io.BinReader) {
r.ReadLE(&p.ValidatorIndex)
p.InvocationScript = r.ReadBytes()
}
// EncodeBinary implements io.Serializable interface.
func (p *preparationCompact) EncodeBinary(w *io.BinWriter) {
w.WriteLE(p.ValidatorIndex)
w.WriteBytes(p.InvocationScript)
}

View file

@ -0,0 +1,18 @@
package consensus
import "github.com/CityOfZion/neo-go/pkg/io"
// recoveryRequest represents dBFT RecoveryRequest message.
type recoveryRequest struct {
Timestamp uint32
}
// DecodeBinary implements io.Serializable interface.
func (m *recoveryRequest) DecodeBinary(r *io.BinReader) {
r.ReadLE(&m.Timestamp)
}
// EncodeBinary implements io.Serializable interface.
func (m *recoveryRequest) EncodeBinary(w *io.BinWriter) {
w.WriteLE(m.Timestamp)
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"github.com/CityOfZion/neo-go/config"
"github.com/CityOfZion/neo-go/pkg/consensus"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/hash"
@ -185,8 +186,7 @@ func (m *Message) decodePayload(br *io.BinReader) error {
case CMDBlock:
p = &core.Block{}
case CMDConsensus:
// Stubbed out for now, see #431.
return nil
p = &consensus.Payload{}
case CMDGetBlocks:
fallthrough
case CMDGetHeaders:

View file

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/CityOfZion/neo-go/pkg/consensus"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/network/payload"
@ -50,6 +51,7 @@ type (
discovery Discoverer
chain core.Blockchainer
bQueue *blockQueue
consensus consensus.Service
lock sync.RWMutex
peers map[Peer]bool
@ -78,6 +80,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server {
register: make(chan Peer),
unregister: make(chan peerDrop),
peers: make(map[Peer]bool),
consensus: consensus.NewService(),
}
if s.MinPeers <= 0 {
@ -366,8 +369,21 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
}
}
case payload.ConsensusType:
// TODO (#431)
for _, hash := range inv.Hashes {
if cp := s.consensus.GetPayload(hash); cp != nil {
if err := p.WriteMsg(NewMessage(s.Net, CMDConsensus, cp)); err != nil {
return err
}
}
}
}
return nil
}
// handleConsensusCmd processes received consensus payload.
// It never returns an error.
func (s *Server) handleConsensusCmd(cp *consensus.Payload) error {
s.consensus.OnPayload(cp)
return nil
}
@ -459,6 +475,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDBlock:
block := msg.Payload.(*core.Block)
return s.handleBlockCmd(peer, block)
case CMDConsensus:
cp := msg.Payload.(*consensus.Payload)
return s.handleConsensusCmd(cp)
case CMDVersion, CMDVerack:
return fmt.Errorf("received '%s' after the handshake", msg.CommandType())
}