diff --git a/pkg/consensus/cache.go b/pkg/consensus/cache.go new file mode 100644 index 000000000..39af2db31 --- /dev/null +++ b/pkg/consensus/cache.go @@ -0,0 +1,60 @@ +package consensus + +import ( + "container/list" + "sync" + + "github.com/CityOfZion/neo-go/pkg/util" +) + +// relayCache is a payload cache which is used to store +// last consensus payloads. +type relayCache struct { + *sync.RWMutex + + maxCap int + elems map[util.Uint256]*list.Element + queue *list.List +} + +func newFIFOCache(capacity int) *relayCache { + return &relayCache{ + RWMutex: new(sync.RWMutex), + + maxCap: capacity, + elems: make(map[util.Uint256]*list.Element), + queue: list.New(), + } +} + +// Add adds payload into a cache if it doesn't already exist. +func (c *relayCache) Add(p *Payload) { + c.Lock() + defer c.Unlock() + + h := p.Hash() + if c.elems[h] != nil { + return + } + + if c.queue.Len() >= c.maxCap { + first := c.queue.Front() + c.queue.Remove(first) + delete(c.elems, first.Value.(*Payload).Hash()) + } + + e := c.queue.PushBack(p) + c.elems[h] = e +} + +// Get returns payload with the specified hash from cache. +func (c *relayCache) Get(h util.Uint256) *Payload { + c.RLock() + defer c.RUnlock() + + e, ok := c.elems[h] + if !ok { + return nil + } + return e.Value.(*Payload) +} diff --git a/pkg/consensus/cache_test.go b/pkg/consensus/cache_test.go new file mode 100644 index 000000000..631cc70d9 --- /dev/null +++ b/pkg/consensus/cache_test.go @@ -0,0 +1,60 @@ +package consensus + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRelayCache_Add(t *testing.T) { + const capacity = 3 + payloads := getDifferentPayloads(t, capacity+1) + c := newFIFOCache(capacity) + require.Equal(t, 0, c.queue.Len()) + require.Equal(t, 0, len(c.elems)) + + for i := 1; i < capacity; i++ { + c.Add(&payloads[i]) + require.Equal(t, i, c.queue.Len()) + require.Equal(t, i, len(c.elems)) + } + + // add already existing payload + c.Add(&payloads[1]) + require.Equal(t, capacity-1, c.queue.Len()) + require.Equal(t, capacity-1, len(c.elems)) + + c.Add(&payloads[0]) + require.Equal(t, capacity, c.queue.Len()) + require.Equal(t, capacity, len(c.elems)) + + // capacity does not exceed maximum + c.Add(&payloads[capacity]) + require.Equal(t, capacity, c.queue.Len()) + require.Equal(t, capacity, len(c.elems)) + + // recent payloads are still present in cache + require.Equal(t, &payloads[0], c.Get(payloads[0].Hash())) + for i := 2; i <= capacity; i++ { + require.Equal(t, &payloads[i], c.Get(payloads[i].Hash())) + } + + // oldest payload was removed + require.Equal(t, (*Payload)(nil), c.Get(payloads[1].Hash())) +} + +func getDifferentPayloads(t *testing.T, n int) (payloads []Payload) { + payloads = make([]Payload, n) + for i := range payloads { + var sign [signatureSize]byte + fillRandom(t, sign[:]) + + payloads[i].ValidatorIndex = uint16(i) + payloads[i].Type = commitType + payloads[i].payload = &commit{ + Signature: sign, + } + } + + return +} diff --git a/pkg/consensus/change_view.go b/pkg/consensus/change_view.go new file mode 100644 index 000000000..4274a38d2 --- /dev/null +++ b/pkg/consensus/change_view.go @@ -0,0 +1,19 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/io" + +// changeView represents dBFT ChangeView message. +type changeView struct { + NewViewNumber byte + Timestamp uint32 +} + +// EncodeBinary implements io.Serializable interface. +func (c *changeView) EncodeBinary(w *io.BinWriter) { + w.WriteLE(c.Timestamp) +} + +// DecodeBinary implements io.Serializable interface. +func (c *changeView) DecodeBinary(r *io.BinReader) { + r.ReadLE(&c.Timestamp) +} diff --git a/pkg/consensus/commit.go b/pkg/consensus/commit.go new file mode 100644 index 000000000..2d3acac08 --- /dev/null +++ b/pkg/consensus/commit.go @@ -0,0 +1,22 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/io" + +// commit represents dBFT Commit message. +type commit struct { + Signature [signatureSize]byte +} + +// signatureSize is an rfc6989 signature size in bytes +// without leading byte (0x04, uncompressed) +const signatureSize = 64 + +// EncodeBinary implements io.Serializable interface. +func (c *commit) EncodeBinary(w *io.BinWriter) { + w.WriteBE(c.Signature) +} + +// DecodeBinary implements io.Serializable interface. +func (c *commit) DecodeBinary(r *io.BinReader) { + r.ReadBE(&c.Signature) +} diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go new file mode 100644 index 000000000..b76bd8b91 --- /dev/null +++ b/pkg/consensus/consensus.go @@ -0,0 +1,34 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/util" + +// cacheMaxCapacity is the default cache capacity taken +// from C# implementation https://github.com/neo-project/neo/blob/master/neo/Ledger/Blockchain.cs#L64 +const cacheMaxCapacity = 100 + +// Service represents consensus instance. +type Service interface { + OnPayload(p *Payload) + GetPayload(h util.Uint256) *Payload +} + +type service struct { + cache *relayCache +} + +// NewService returns new consensus.Service instance. +func NewService() Service { + return &service{ + cache: newFIFOCache(cacheMaxCapacity), + } +} + +// OnPayload handles Payload receive. +func (s *service) OnPayload(p *Payload) { + s.cache.Add(p) +} + +// GetPayload returns payload stored in cache. +func (s *service) GetPayload(h util.Uint256) *Payload { + return s.cache.Get(h) +} diff --git a/pkg/consensus/payload.go b/pkg/consensus/payload.go new file mode 100644 index 000000000..5abdef1aa --- /dev/null +++ b/pkg/consensus/payload.go @@ -0,0 +1,155 @@ +package consensus + +import ( + "fmt" + + "github.com/CityOfZion/neo-go/pkg/core/transaction" + "github.com/CityOfZion/neo-go/pkg/crypto/hash" + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" + "github.com/pkg/errors" +) + +type ( + messageType byte + + message struct { + Type messageType + ViewNumber byte + + payload io.Serializable + } + + // Payload is a type for consensus-related messages. + Payload struct { + message + + Version uint32 + ValidatorIndex uint16 + PrevHash util.Uint256 + Height uint32 + Timestamp uint32 + + Witness transaction.Witness + } +) + +const ( + changeViewType messageType = 0x00 + prepareRequestType messageType = 0x20 + prepareResponseType messageType = 0x21 + commitType messageType = 0x30 + recoveryRequestType messageType = 0x40 + recoveryMessageType messageType = 0x41 +) + +// EncodeBinaryUnsigned writes payload to w excluding signature. +func (p *Payload) EncodeBinaryUnsigned(w *io.BinWriter) { + w.WriteLE(p.Version) + w.WriteBE(p.PrevHash[:]) + w.WriteLE(p.Height) + w.WriteLE(p.ValidatorIndex) + w.WriteLE(p.Timestamp) + + ww := io.NewBufBinWriter() + p.message.EncodeBinary(ww.BinWriter) + w.WriteBytes(ww.Bytes()) +} + +// EncodeBinary implements io.Serializable interface. +func (p *Payload) EncodeBinary(w *io.BinWriter) { + p.EncodeBinaryUnsigned(w) + + w.WriteLE(byte(1)) + p.Witness.EncodeBinary(w) +} + +// DecodeBinaryUnsigned reads payload from w excluding signature. +func (p *Payload) DecodeBinaryUnsigned(r *io.BinReader) { + r.ReadLE(&p.Version) + r.ReadBE(p.PrevHash[:]) + r.ReadLE(&p.Height) + r.ReadLE(&p.ValidatorIndex) + r.ReadLE(&p.Timestamp) + + data := r.ReadBytes() + rr := io.NewBinReaderFromBuf(data) + p.message.DecodeBinary(rr) +} + +// Hash returns 32-byte message hash. +func (p *Payload) Hash() util.Uint256 { + w := io.NewBufBinWriter() + p.EncodeBinaryUnsigned(w.BinWriter) + + return hash.DoubleSha256(w.Bytes()) +} + +// DecodeBinary implements io.Serializable interface. +func (p *Payload) DecodeBinary(r *io.BinReader) { + p.DecodeBinaryUnsigned(r) + + var b byte + r.ReadLE(&b) + if b != 1 { + r.Err = errors.New("invalid format") + return + } + + p.Witness.DecodeBinary(r) +} + +// EncodeBinary implements io.Serializable interface. +func (m *message) EncodeBinary(w *io.BinWriter) { + w.WriteLE(byte(m.Type)) + w.WriteLE(m.ViewNumber) + m.payload.EncodeBinary(w) +} + +// DecodeBinary implements io.Serializable interface. +func (m *message) DecodeBinary(r *io.BinReader) { + r.ReadLE((*byte)(&m.Type)) + r.ReadLE(&m.ViewNumber) + + switch m.Type { + case changeViewType: + cv := new(changeView) + // NewViewNumber is not marshaled + cv.NewViewNumber = m.ViewNumber + 1 + m.payload = cv + case prepareRequestType: + m.payload = new(prepareRequest) + case prepareResponseType: + m.payload = new(prepareResponse) + case commitType: + m.payload = new(commit) + case recoveryRequestType: + m.payload = new(recoveryRequest) + case recoveryMessageType: + m.payload = new(recoveryMessage) + default: + r.Err = errors.Errorf("invalid type: 0x%02x", byte(m.Type)) + return + } + m.payload.DecodeBinary(r) +} + +// String implements fmt.Stringer interface. +func (t messageType) String() string { + switch t { + case changeViewType: + return "ChangeView" + case prepareRequestType: + return "PrepareRequest" + case prepareResponseType: + return "PrepareResponse" + case commitType: + return "Commit" + case recoveryRequestType: + return "RecoveryRequest" + case recoveryMessageType: + return "RecoveryMessage" + default: + return fmt.Sprintf("UNKNOWN(0x%02x)", byte(t)) + } +} diff --git a/pkg/consensus/payload_test.go b/pkg/consensus/payload_test.go new file mode 100644 index 000000000..8759ead89 --- /dev/null +++ b/pkg/consensus/payload_test.go @@ -0,0 +1,220 @@ +package consensus + +import ( + "encoding/hex" + gio "io" + "math/rand" + "testing" + "time" + + "github.com/CityOfZion/neo-go/pkg/core/transaction" + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" + "github.com/stretchr/testify/require" +) + +var messageTypes = []messageType{ + changeViewType, + prepareRequestType, + prepareResponseType, + commitType, + recoveryRequestType, + recoveryMessageType, +} + +func TestConsensusPayload_Hash(t *testing.T) { + dataHex := "00000000d8fb8d3b143b5f98468ef701909c976505a110a01e26c5e99be9a90cff979199b6fc33000400000000008d2000184dc95de24018f9ad71f4448a2b438eaca8b4b2ab6b4524b5a69a45d920c35103f3901444320656c390ff39c0062f5e8e138ce446a40c7e4ba1af1f8247ebbdf49295933715d3a67949714ff924f8a28cec5b954c71eca3bfaf0e9d4b1f87b4e21e9ba4ae18f97de71501b5c5d07edc200bd66a46b9b28b1a371f2195c10b0af90000e24018f900000000014140c9faaee59942f58da0e5268bc199632f2a3ad0fcbee68681a4437f140b49512e8d9efc6880eb44d3490782895a5794f35eeccee2923ce0c76fa7a1890f934eac232103c089d7122b840a4935234e82e26ae5efd0c2acb627239dc9f207311337b6f2c1ac" + data, err := hex.DecodeString(dataHex) + require.NoError(t, err) + + r := io.NewBinReaderFromBuf(data) + + var p Payload + p.DecodeBinary(r) + + require.NoError(t, err) + require.Equal(t, p.Hash().String(), "45859759c8491597804f1922773947e0d37bf54484af82f80cd642f7b063aa56") +} + +func TestConsensusPayload_Serializable(t *testing.T) { + for _, mt := range messageTypes { + p := randomPayload(t, mt) + testSerializable(t, p, new(Payload)) + } +} + +func TestCommit_Serializable(t *testing.T) { + c := randomMessage(t, commitType) + testSerializable(t, c, new(commit)) +} + +func TestPrepareResponse_Serializable(t *testing.T) { + resp := randomMessage(t, prepareResponseType) + testSerializable(t, resp, new(prepareResponse)) +} + +func TestPrepareRequest_Serializable(t *testing.T) { + req := randomMessage(t, prepareRequestType) + testSerializable(t, req, new(prepareRequest)) +} + +func TestRecoveryRequest_Serializable(t *testing.T) { + req := randomMessage(t, recoveryRequestType) + testSerializable(t, req, new(recoveryRequest)) +} + +func TestRecoveryMessage_Serializable(t *testing.T) { + msg := randomMessage(t, recoveryMessageType) + testSerializable(t, msg, new(recoveryMessage)) +} + +func randomPayload(t *testing.T, mt messageType) *Payload { + p := &Payload{ + message: message{ + Type: mt, + ViewNumber: byte(rand.Uint32()), + payload: randomMessage(t, mt), + }, + Version: 1, + ValidatorIndex: 13, + Height: rand.Uint32(), + Timestamp: rand.Uint32(), + Witness: transaction.Witness{ + InvocationScript: fillRandom(t, make([]byte, 3)), + VerificationScript: fillRandom(t, make([]byte, 4)), + }, + } + fillRandom(t, p.PrevHash[:]) + + if mt == changeViewType { + p.payload.(*changeView).NewViewNumber = p.ViewNumber + 1 + } + + return p +} + +func randomMessage(t *testing.T, mt messageType) io.Serializable { + switch mt { + case changeViewType: + return &changeView{ + Timestamp: rand.Uint32(), + } + case prepareRequestType: + return randomPrepareRequest(t) + case prepareResponseType: + resp := &prepareResponse{} + fillRandom(t, resp.PreparationHash[:]) + return resp + case commitType: + var c commit + fillRandom(t, c.Signature[:]) + return &c + case recoveryRequestType: + return &recoveryRequest{Timestamp: rand.Uint32()} + case recoveryMessageType: + return randomRecoveryMessage(t) + default: + require.Fail(t, "invalid type") + return nil + } +} + +func randomPrepareRequest(t *testing.T) *prepareRequest { + const txCount = 3 + + req := &prepareRequest{ + Timestamp: rand.Uint32(), + Nonce: rand.Uint64(), + TransactionHashes: make([]util.Uint256, txCount), + MinerTransaction: *newMinerTx(rand.Uint32()), + } + + req.TransactionHashes[0] = req.MinerTransaction.Hash() + for i := 1; i < txCount; i++ { + fillRandom(t, req.TransactionHashes[i][:]) + } + fillRandom(t, req.NextConsensus[:]) + + return req +} + +func randomRecoveryMessage(t *testing.T) *recoveryMessage { + result := randomMessage(t, prepareRequestType) + require.IsType(t, (*prepareRequest)(nil), result) + prepReq := result.(*prepareRequest) + + return &recoveryMessage{ + PreparationPayloads: []*preparationCompact{ + { + ValidatorIndex: 1, + InvocationScript: fillRandom(t, make([]byte, 10)), + }, + }, + CommitPayloads: []*commitCompact{ + { + ViewNumber: 0, + ValidatorIndex: 1, + Signature: fillRandom(t, make([]byte, signatureSize)), + InvocationScript: fillRandom(t, make([]byte, 20)), + }, + { + ViewNumber: 0, + ValidatorIndex: 2, + Signature: fillRandom(t, make([]byte, signatureSize)), + InvocationScript: fillRandom(t, make([]byte, 10)), + }, + }, + ChangeViewPayloads: []*changeViewCompact{ + { + Timestamp: rand.Uint32(), + ValidatorIndex: 3, + OriginalViewNumber: 3, + InvocationScript: fillRandom(t, make([]byte, 4)), + }, + }, + PrepareRequest: prepReq, + } +} + +func TestMessageType_String(t *testing.T) { + require.Equal(t, "ChangeView", changeViewType.String()) + require.Equal(t, "PrepareRequest", prepareRequestType.String()) + require.Equal(t, "PrepareResponse", prepareResponseType.String()) + require.Equal(t, "Commit", commitType.String()) + require.Equal(t, "RecoveryMessage", recoveryMessageType.String()) + require.Equal(t, "RecoveryRequest", recoveryRequestType.String()) + require.Equal(t, "UNKNOWN(0xff)", messageType(0xff).String()) +} + +func testSerializable(t *testing.T, expected, actual io.Serializable) { + w := io.NewBufBinWriter() + expected.EncodeBinary(w.BinWriter) + + r := io.NewBinReaderFromBuf(w.Bytes()) + actual.DecodeBinary(r) + + require.Equal(t, expected, actual) +} + +func fillRandom(t *testing.T, buf []byte) []byte { + r := rand.New(rand.NewSource(time.Now().Unix())) + _, err := gio.ReadFull(r, buf) + require.NoError(t, err) + + return buf +} + +func newMinerTx(nonce uint32) *transaction.Transaction { + return &transaction.Transaction{ + Type: transaction.MinerType, + Version: 0, + Data: &transaction.MinerTX{ + Nonce: rand.Uint32(), + }, + Attributes: []*transaction.Attribute{}, + Inputs: []*transaction.Input{}, + Outputs: []*transaction.Output{}, + Scripts: []*transaction.Witness{}, + Trimmed: false, + } +} diff --git a/pkg/consensus/prepare_request.go b/pkg/consensus/prepare_request.go new file mode 100644 index 000000000..02c374747 --- /dev/null +++ b/pkg/consensus/prepare_request.go @@ -0,0 +1,43 @@ +package consensus + +import ( + "github.com/CityOfZion/neo-go/pkg/core/transaction" + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" +) + +// prepareRequest represents dBFT PrepareRequest message. +type prepareRequest struct { + Timestamp uint32 + Nonce uint64 + TransactionHashes []util.Uint256 + MinerTransaction transaction.Transaction + NextConsensus util.Uint160 +} + +// EncodeBinary implements io.Serializable interface. +func (p *prepareRequest) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.Timestamp) + w.WriteLE(p.Nonce) + w.WriteBE(p.NextConsensus[:]) + w.WriteVarUint(uint64(len(p.TransactionHashes))) + for i := range p.TransactionHashes { + w.WriteBE(p.TransactionHashes[i][:]) + } + p.MinerTransaction.EncodeBinary(w) +} + +// DecodeBinary implements io.Serializable interface. +func (p *prepareRequest) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.Timestamp) + r.ReadLE(&p.Nonce) + r.ReadBE(p.NextConsensus[:]) + + lenHashes := r.ReadVarUint() + p.TransactionHashes = make([]util.Uint256, lenHashes) + for i := range p.TransactionHashes { + r.ReadBE(p.TransactionHashes[i][:]) + } + + p.MinerTransaction.DecodeBinary(r) +} diff --git a/pkg/consensus/prepare_response.go b/pkg/consensus/prepare_response.go new file mode 100644 index 000000000..4636b3139 --- /dev/null +++ b/pkg/consensus/prepare_response.go @@ -0,0 +1,21 @@ +package consensus + +import ( + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" +) + +// prepareResponse represents dBFT PrepareResponse message. +type prepareResponse struct { + PreparationHash util.Uint256 +} + +// EncodeBinary implements io.Serializable interface. +func (p *prepareResponse) EncodeBinary(w *io.BinWriter) { + w.WriteBE(p.PreparationHash[:]) +} + +// DecodeBinary implements io.Serializable interface. +func (p *prepareResponse) DecodeBinary(r *io.BinReader) { + r.ReadBE(p.PreparationHash[:]) +} diff --git a/pkg/consensus/recovery_message.go b/pkg/consensus/recovery_message.go new file mode 100644 index 000000000..8b248b271 --- /dev/null +++ b/pkg/consensus/recovery_message.go @@ -0,0 +1,132 @@ +package consensus + +import ( + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" + "github.com/pkg/errors" +) + +type ( + // recoveryMessage represents dBFT Recovery message. + recoveryMessage struct { + PreparationHash *util.Uint256 + PreparationPayloads []*preparationCompact + CommitPayloads []*commitCompact + ChangeViewPayloads []*changeViewCompact + PrepareRequest *prepareRequest + } + + changeViewCompact struct { + ValidatorIndex uint16 + OriginalViewNumber byte + Timestamp uint32 + InvocationScript []byte + } + + commitCompact struct { + ViewNumber byte + ValidatorIndex uint16 + Signature []byte + InvocationScript []byte + } + + preparationCompact struct { + ValidatorIndex uint16 + InvocationScript []byte + } +) + +const uint256size = 32 + +// DecodeBinary implements io.Serializable interface. +func (m *recoveryMessage) DecodeBinary(r *io.BinReader) { + m.ChangeViewPayloads = r.ReadArray(changeViewCompact{}).([]*changeViewCompact) + + var hasReq bool + r.ReadLE(&hasReq) + if hasReq { + m.PrepareRequest = new(prepareRequest) + m.PrepareRequest.DecodeBinary(r) + } else { + l := r.ReadVarUint() + if l != 0 { + if l == uint256size { + m.PreparationHash = new(util.Uint256) + r.ReadBE(m.PreparationHash[:]) + } else { + r.Err = errors.New("invalid data") + } + } else { + m.PreparationHash = nil + } + } + + m.PreparationPayloads = r.ReadArray(preparationCompact{}).([]*preparationCompact) + m.CommitPayloads = r.ReadArray(commitCompact{}).([]*commitCompact) +} + +// EncodeBinary implements io.Serializable interface. +func (m *recoveryMessage) EncodeBinary(w *io.BinWriter) { + w.WriteArray(m.ChangeViewPayloads) + + hasReq := m.PrepareRequest != nil + w.WriteLE(hasReq) + if hasReq { + m.PrepareRequest.EncodeBinary(w) + } else { + if m.PreparationHash == nil { + w.WriteVarUint(0) + } else { + w.WriteVarUint(uint256size) + w.WriteBE(m.PreparationHash[:]) + } + } + w.WriteArray(m.PreparationPayloads) + w.WriteArray(m.CommitPayloads) +} + +// DecodeBinary implements io.Serializable interface. +func (p *changeViewCompact) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.ValidatorIndex) + r.ReadLE(&p.OriginalViewNumber) + r.ReadLE(&p.Timestamp) + p.InvocationScript = r.ReadBytes() +} + +// EncodeBinary implements io.Serializable interface. +func (p *changeViewCompact) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.ValidatorIndex) + w.WriteLE(p.OriginalViewNumber) + w.WriteLE(p.Timestamp) + w.WriteBytes(p.InvocationScript) +} + +// DecodeBinary implements io.Serializable interface. +func (p *commitCompact) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.ViewNumber) + r.ReadLE(&p.ValidatorIndex) + + p.Signature = make([]byte, signatureSize) + r.ReadBE(&p.Signature) + p.InvocationScript = r.ReadBytes() +} + +// EncodeBinary implements io.Serializable interface. +func (p *commitCompact) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.ViewNumber) + w.WriteLE(p.ValidatorIndex) + w.WriteBE(p.Signature) + w.WriteBytes(p.InvocationScript) +} + +// DecodeBinary implements io.Serializable interface. +func (p *preparationCompact) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.ValidatorIndex) + p.InvocationScript = r.ReadBytes() +} + +// EncodeBinary implements io.Serializable interface. +func (p *preparationCompact) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.ValidatorIndex) + w.WriteBytes(p.InvocationScript) +} diff --git a/pkg/consensus/recovery_request.go b/pkg/consensus/recovery_request.go new file mode 100644 index 000000000..72c74b9ee --- /dev/null +++ b/pkg/consensus/recovery_request.go @@ -0,0 +1,18 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/io" + +// recoveryRequest represents dBFT RecoveryRequest message. +type recoveryRequest struct { + Timestamp uint32 +} + +// DecodeBinary implements io.Serializable interface. +func (m *recoveryRequest) DecodeBinary(r *io.BinReader) { + r.ReadLE(&m.Timestamp) +} + +// EncodeBinary implements io.Serializable interface. +func (m *recoveryRequest) EncodeBinary(w *io.BinWriter) { + w.WriteLE(m.Timestamp) +} diff --git a/pkg/network/message.go b/pkg/network/message.go index 8ea89ae4d..2807e22de 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/CityOfZion/neo-go/config" + "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/crypto/hash" @@ -185,8 +186,7 @@ func (m *Message) decodePayload(br *io.BinReader) error { case CMDBlock: p = &core.Block{} case CMDConsensus: - // Stubbed out for now, see #431. - return nil + p = &consensus.Payload{} case CMDGetBlocks: fallthrough case CMDGetHeaders: diff --git a/pkg/network/server.go b/pkg/network/server.go index be9cb7971..79027a3c1 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" @@ -50,6 +51,7 @@ type ( discovery Discoverer chain core.Blockchainer bQueue *blockQueue + consensus consensus.Service lock sync.RWMutex peers map[Peer]bool @@ -78,6 +80,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), + consensus: consensus.NewService(), } if s.MinPeers <= 0 { @@ -366,11 +369,24 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { } } case payload.ConsensusType: - // TODO (#431) + for _, hash := range inv.Hashes { + if cp := s.consensus.GetPayload(hash); cp != nil { + if err := p.WriteMsg(NewMessage(s.Net, CMDConsensus, cp)); err != nil { + return err + } + } + } } return nil } +// handleConsensusCmd processes received consensus payload. +// It never returns an error. +func (s *Server) handleConsensusCmd(cp *consensus.Payload) error { + s.consensus.OnPayload(cp) + return nil +} + // handleAddrCmd will process received addresses. func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { for _, a := range addrs.Addrs { @@ -459,6 +475,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { case CMDBlock: block := msg.Payload.(*core.Block) return s.handleBlockCmd(peer, block) + case CMDConsensus: + cp := msg.Payload.(*consensus.Payload) + return s.handleConsensusCmd(cp) case CMDVersion, CMDVerack: return fmt.Errorf("received '%s' after the handshake", msg.CommandType()) }