From 61fdd5cde5de586b185f5e8937f5138b92d88d03 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 12 Nov 2019 12:58:11 +0300 Subject: [PATCH 1/3] util: make Uint256Size public --- pkg/util/uint256.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/util/uint256.go b/pkg/util/uint256.go index 4de10b135..40dfdb93a 100644 --- a/pkg/util/uint256.go +++ b/pkg/util/uint256.go @@ -8,15 +8,16 @@ import ( "strings" ) -const uint256Size = 32 +// Uint256Size is the size of Uint256 in bytes. +const Uint256Size = 32 // Uint256 is a 32 byte long unsigned integer. -type Uint256 [uint256Size]uint8 +type Uint256 [Uint256Size]uint8 // Uint256DecodeReverseString attempts to decode the given string (in LE representation) into an Uint256. func Uint256DecodeReverseString(s string) (u Uint256, err error) { - if len(s) != uint256Size*2 { - return u, fmt.Errorf("expected string size of %d got %d", uint256Size*2, len(s)) + if len(s) != Uint256Size*2 { + return u, fmt.Errorf("expected string size of %d got %d", Uint256Size*2, len(s)) } b, err := hex.DecodeString(s) if err != nil { @@ -27,8 +28,8 @@ func Uint256DecodeReverseString(s string) (u Uint256, err error) { // Uint256DecodeBytes attempts to decode the given string (in BE representation) into an Uint256. func Uint256DecodeBytes(b []byte) (u Uint256, err error) { - if len(b) != uint256Size { - return u, fmt.Errorf("expected []byte of size %d got %d", uint256Size, len(b)) + if len(b) != Uint256Size { + return u, fmt.Errorf("expected []byte of size %d got %d", Uint256Size, len(b)) } copy(u[:], b) return u, nil From ad9091d13d818f5be9920d28d925370a8426fbeb Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 13 Nov 2019 10:36:29 +0300 Subject: [PATCH 2/3] io: implement generic array (de-)serialization It is done through reflection and panics in every unexpected situation. --- pkg/core/account_state.go | 12 +---- pkg/core/block.go | 8 +-- pkg/core/transaction/claim.go | 12 +---- pkg/core/transaction/state.go | 12 +---- pkg/core/transaction/transaction.go | 51 +++--------------- pkg/io/binaryReader.go | 33 ++++++++++++ pkg/io/binaryWriter.go | 32 ++++++++++++ pkg/io/binaryrw_test.go | 81 +++++++++++++++++++++++++++++ pkg/network/payload/address.go | 13 +---- pkg/network/payload/headers.go | 6 +-- 10 files changed, 164 insertions(+), 96 deletions(-) diff --git a/pkg/core/account_state.go b/pkg/core/account_state.go index 7d7fdb807..541381f5b 100644 --- a/pkg/core/account_state.go +++ b/pkg/core/account_state.go @@ -94,12 +94,7 @@ func (s *AccountState) DecodeBinary(br *io.BinReader) { br.ReadLE(&s.Version) br.ReadLE(&s.ScriptHash) br.ReadLE(&s.IsFrozen) - lenVotes := br.ReadVarUint() - s.Votes = make([]*keys.PublicKey, lenVotes) - for i := 0; i < int(lenVotes); i++ { - s.Votes[i] = &keys.PublicKey{} - s.Votes[i].DecodeBinary(br) - } + s.Votes = br.ReadArray(keys.PublicKey{}).([]*keys.PublicKey) s.Balances = make(map[util.Uint256]util.Fixed8) lenBalances := br.ReadVarUint() @@ -117,10 +112,7 @@ func (s *AccountState) EncodeBinary(bw *io.BinWriter) { bw.WriteLE(s.Version) bw.WriteLE(s.ScriptHash) bw.WriteLE(s.IsFrozen) - bw.WriteVarUint(uint64(len(s.Votes))) - for _, point := range s.Votes { - point.EncodeBinary(bw) - } + bw.WriteArray(s.Votes) balances := s.nonZeroBalances() bw.WriteVarUint(uint64(len(balances))) diff --git a/pkg/core/block.go b/pkg/core/block.go index de790fc43..90b94a749 100644 --- a/pkg/core/block.go +++ b/pkg/core/block.go @@ -128,13 +128,7 @@ func (b *Block) Trim() ([]byte, error) { // Serializable interface. func (b *Block) DecodeBinary(br *io.BinReader) { b.BlockBase.DecodeBinary(br) - - lentx := br.ReadVarUint() - b.Transactions = make([]*transaction.Transaction, lentx) - for i := 0; i < int(lentx); i++ { - b.Transactions[i] = &transaction.Transaction{} - b.Transactions[i].DecodeBinary(br) - } + b.Transactions = br.ReadArray(transaction.Transaction{}).([]*transaction.Transaction) } // EncodeBinary encodes the block to the given BinWriter, implementing diff --git a/pkg/core/transaction/claim.go b/pkg/core/transaction/claim.go index 7140f6aad..ef7eecfae 100644 --- a/pkg/core/transaction/claim.go +++ b/pkg/core/transaction/claim.go @@ -11,18 +11,10 @@ type ClaimTX struct { // DecodeBinary implements Serializable interface. func (tx *ClaimTX) DecodeBinary(br *io.BinReader) { - lenClaims := br.ReadVarUint() - tx.Claims = make([]*Input, lenClaims) - for i := 0; i < int(lenClaims); i++ { - tx.Claims[i] = &Input{} - tx.Claims[i].DecodeBinary(br) - } + tx.Claims = br.ReadArray(Input{}).([]*Input) } // EncodeBinary implements Serializable interface. func (tx *ClaimTX) EncodeBinary(bw *io.BinWriter) { - bw.WriteVarUint(uint64(len(tx.Claims))) - for _, claim := range tx.Claims { - claim.EncodeBinary(bw) - } + bw.WriteArray(tx.Claims) } diff --git a/pkg/core/transaction/state.go b/pkg/core/transaction/state.go index 22a214e99..c3c181125 100644 --- a/pkg/core/transaction/state.go +++ b/pkg/core/transaction/state.go @@ -11,18 +11,10 @@ type StateTX struct { // DecodeBinary implements Serializable interface. func (tx *StateTX) DecodeBinary(r *io.BinReader) { - lenDesc := r.ReadVarUint() - tx.Descriptors = make([]*StateDescriptor, lenDesc) - for i := 0; i < int(lenDesc); i++ { - tx.Descriptors[i] = &StateDescriptor{} - tx.Descriptors[i].DecodeBinary(r) - } + tx.Descriptors = r.ReadArray(StateDescriptor{}).([]*StateDescriptor) } // EncodeBinary implements Serializable interface. func (tx *StateTX) EncodeBinary(w *io.BinWriter) { - w.WriteVarUint(uint64(len(tx.Descriptors))) - for _, desc := range tx.Descriptors { - desc.EncodeBinary(w) - } + w.WriteArray(tx.Descriptors) } diff --git a/pkg/core/transaction/transaction.go b/pkg/core/transaction/transaction.go index d878bb29f..712d44d78 100644 --- a/pkg/core/transaction/transaction.go +++ b/pkg/core/transaction/transaction.go @@ -95,33 +95,10 @@ func (t *Transaction) DecodeBinary(br *io.BinReader) { br.ReadLE(&t.Version) t.decodeData(br) - lenAttrs := br.ReadVarUint() - t.Attributes = make([]*Attribute, lenAttrs) - for i := 0; i < int(lenAttrs); i++ { - t.Attributes[i] = &Attribute{} - t.Attributes[i].DecodeBinary(br) - } - - lenInputs := br.ReadVarUint() - t.Inputs = make([]*Input, lenInputs) - for i := 0; i < int(lenInputs); i++ { - t.Inputs[i] = &Input{} - t.Inputs[i].DecodeBinary(br) - } - - lenOutputs := br.ReadVarUint() - t.Outputs = make([]*Output, lenOutputs) - for i := 0; i < int(lenOutputs); i++ { - t.Outputs[i] = &Output{} - t.Outputs[i].DecodeBinary(br) - } - - lenScripts := br.ReadVarUint() - t.Scripts = make([]*Witness, lenScripts) - for i := 0; i < int(lenScripts); i++ { - t.Scripts[i] = &Witness{} - t.Scripts[i].DecodeBinary(br) - } + t.Attributes = br.ReadArray(Attribute{}).([]*Attribute) + t.Inputs = br.ReadArray(Input{}).([]*Input) + t.Outputs = br.ReadArray(Output{}).([]*Output) + t.Scripts = br.ReadArray(Witness{}).([]*Witness) // Create the hash of the transaction at decode, so we dont need // to do it anymore. @@ -167,10 +144,7 @@ func (t *Transaction) decodeData(r *io.BinReader) { // EncodeBinary implements Serializable interface. func (t *Transaction) EncodeBinary(bw *io.BinWriter) { t.encodeHashableFields(bw) - bw.WriteVarUint(uint64(len(t.Scripts))) - for _, s := range t.Scripts { - s.EncodeBinary(bw) - } + bw.WriteArray(t.Scripts) } // encodeHashableFields encodes the fields that are not used for @@ -185,22 +159,13 @@ func (t *Transaction) encodeHashableFields(bw *io.BinWriter) { } // Attributes - bw.WriteVarUint(uint64(len(t.Attributes))) - for _, attr := range t.Attributes { - attr.EncodeBinary(bw) - } + bw.WriteArray(t.Attributes) // Inputs - bw.WriteVarUint(uint64(len(t.Inputs))) - for _, in := range t.Inputs { - in.EncodeBinary(bw) - } + bw.WriteArray(t.Inputs) // Outputs - bw.WriteVarUint(uint64(len(t.Outputs))) - for _, out := range t.Outputs { - out.EncodeBinary(bw) - } + bw.WriteArray(t.Outputs) } // createHash creates the hash of the transaction. diff --git a/pkg/io/binaryReader.go b/pkg/io/binaryReader.go index 2a00e20c6..689a80974 100644 --- a/pkg/io/binaryReader.go +++ b/pkg/io/binaryReader.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "io" + "reflect" ) // BinReader is a convenient wrapper around a io.Reader and err object. @@ -33,6 +34,38 @@ func (r *BinReader) ReadLE(v interface{}) { r.Err = binary.Read(r.r, binary.LittleEndian, v) } +// ReadArray reads a slice or an array of pointer to t from r and returns. +func (r *BinReader) ReadArray(t interface{}) interface{} { + elemType := reflect.ValueOf(t).Type() + method, ok := reflect.PtrTo(elemType).MethodByName("DecodeBinary") + if !ok || !isDecodeBinaryMethod(method) { + panic(elemType.String() + " does not have DecodeBinary(*io.BinReader)") + } + + sliceType := reflect.SliceOf(reflect.PtrTo(elemType)) + if r.Err != nil { + return reflect.Zero(sliceType).Interface() + } + + l := int(r.ReadVarUint()) + arr := reflect.MakeSlice(sliceType, l, l) + for i := 0; i < l; i++ { + elem := arr.Index(i) + method := elem.MethodByName("DecodeBinary") + elem.Set(reflect.New(elemType)) + method.Call([]reflect.Value{reflect.ValueOf(r)}) + } + + return arr.Interface() +} + +func isDecodeBinaryMethod(method reflect.Method) bool { + t := method.Type + return t != nil && + t.NumIn() == 2 && t.In(1) == reflect.TypeOf((*BinReader)(nil)) && + t.NumOut() == 0 +} + // ReadBE reads from the underlying io.Reader // into the interface v in big-endian format. func (r *BinReader) ReadBE(v interface{}) { diff --git a/pkg/io/binaryWriter.go b/pkg/io/binaryWriter.go index 912cb3c51..19086e255 100644 --- a/pkg/io/binaryWriter.go +++ b/pkg/io/binaryWriter.go @@ -3,6 +3,7 @@ package io import ( "encoding/binary" "io" + "reflect" ) // BinWriter is a convenient wrapper around a io.Writer and err object. @@ -34,6 +35,37 @@ func (w *BinWriter) WriteBE(v interface{}) { w.Err = binary.Write(w.w, binary.BigEndian, v) } +// WriteArray writes a slice or an array arr into w. +func (w *BinWriter) WriteArray(arr interface{}) { + switch val := reflect.ValueOf(arr); val.Kind() { + case reflect.Slice, reflect.Array: + typ := val.Type().Elem() + method, ok := typ.MethodByName("EncodeBinary") + if !ok || !isEncodeBinaryMethod(method) { + panic(typ.String() + " does not have EncodeBinary(*BinWriter)") + } + + if w.Err != nil { + return + } + + w.WriteVarUint(uint64(val.Len())) + for i := 0; i < val.Len(); i++ { + method := val.Index(i).MethodByName("EncodeBinary") + method.Call([]reflect.Value{reflect.ValueOf(w)}) + } + default: + panic("not an array") + } +} + +func isEncodeBinaryMethod(method reflect.Method) bool { + t := method.Type + return t != nil && + t.NumIn() == 2 && t.In(1) == reflect.TypeOf((*BinWriter)(nil)) && + t.NumOut() == 0 +} + // WriteVarUint writes a uint64 into the underlying writer using variable-length encoding. func (w *BinWriter) WriteVarUint(val uint64) { if w.Err != nil { diff --git a/pkg/io/binaryrw_test.go b/pkg/io/binaryrw_test.go index 1c147f8bf..304b6b651 100644 --- a/pkg/io/binaryrw_test.go +++ b/pkg/io/binaryrw_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // mocks io.Reader and io.Writer, always fails to Write() or Read(). @@ -191,3 +192,83 @@ func TestWriteVarUint100000000000(t *testing.T) { assert.Nil(t, br.Err) assert.Equal(t, val, res) } + +type testSerializable uint16 + +// EncodeBinary implements io.Serializable interface. +func (t testSerializable) EncodeBinary(w *BinWriter) { + w.WriteLE(t) +} + +// DecodeBinary implements io.Serializable interface. +func (t *testSerializable) DecodeBinary(r *BinReader) { + r.ReadLE(t) +} + +func TestBinWriter_WriteArray(t *testing.T) { + var arr [3]testSerializable + for i := range arr { + arr[i] = testSerializable(i) + } + + expected := []byte{3, 0, 0, 1, 0, 2, 0} + + w := NewBufBinWriter() + w.WriteArray(arr) + require.NoError(t, w.Err) + require.Equal(t, expected, w.Bytes()) + + w.Reset() + w.WriteArray(arr[:]) + require.NoError(t, w.Err) + require.Equal(t, expected, w.Bytes()) + + arrS := make([]Serializable, len(arr)) + for i := range arrS { + arrS[i] = &arr[i] + } + + w.Reset() + w.WriteArray(arr) + require.NoError(t, w.Err) + require.Equal(t, expected, w.Bytes()) + + w.Reset() + require.Panics(t, func() { w.WriteArray(1) }) + + w.Reset() + w.Err = errors.New("error") + w.WriteArray(arr[:]) + require.Error(t, w.Err) + require.Equal(t, w.Bytes(), []byte(nil)) + + w.Reset() + w.Err = errors.New("error") + require.Panics(t, func() { w.WriteArray([]int{}) }) + + w.Reset() + w.Err = errors.New("error") + require.Panics(t, func() { w.WriteArray(make(chan testSerializable)) }) +} + +func TestBinReader_ReadArray(t *testing.T) { + data := []byte{3, 0, 0, 1, 0, 2, 0} + r := NewBinReaderFromBuf(data) + result := r.ReadArray(testSerializable(0)) + elems := []testSerializable{0, 1, 2} + require.Equal(t, []*testSerializable{&elems[0], &elems[1], &elems[2]}, result) + + r = NewBinReaderFromBuf(data) + r.Err = errors.New("error") + result = r.ReadArray(testSerializable(0)) + require.Error(t, r.Err) + require.Equal(t, ([]*testSerializable)(nil), result) + + r = NewBinReaderFromBuf([]byte{0}) + result = r.ReadArray(testSerializable(0)) + require.NoError(t, r.Err) + require.Equal(t, []*testSerializable{}, result) + + r = NewBinReaderFromBuf([]byte{0}) + require.Panics(t, func() { r.ReadArray(0) }) +} diff --git a/pkg/network/payload/address.go b/pkg/network/payload/address.go index 47c7b8b5b..2f7728eb2 100644 --- a/pkg/network/payload/address.go +++ b/pkg/network/payload/address.go @@ -67,19 +67,10 @@ func NewAddressList(n int) *AddressList { // DecodeBinary implements Serializable interface. func (p *AddressList) DecodeBinary(br *io.BinReader) { - listLen := br.ReadVarUint() - - p.Addrs = make([]*AddressAndTime, listLen) - for i := 0; i < int(listLen); i++ { - p.Addrs[i] = &AddressAndTime{} - p.Addrs[i].DecodeBinary(br) - } + p.Addrs = br.ReadArray(AddressAndTime{}).([]*AddressAndTime) } // EncodeBinary implements Serializable interface. func (p *AddressList) EncodeBinary(bw *io.BinWriter) { - bw.WriteVarUint(uint64(len(p.Addrs))) - for _, addr := range p.Addrs { - addr.EncodeBinary(bw) - } + bw.WriteArray(p.Addrs) } diff --git a/pkg/network/payload/headers.go b/pkg/network/payload/headers.go index 6ee94c5eb..8635cb1e6 100644 --- a/pkg/network/payload/headers.go +++ b/pkg/network/payload/headers.go @@ -37,9 +37,5 @@ func (p *Headers) DecodeBinary(br *io.BinReader) { // EncodeBinary implements Serializable interface. func (p *Headers) EncodeBinary(bw *io.BinWriter) { - bw.WriteVarUint(uint64(len(p.Hdrs))) - - for _, header := range p.Hdrs { - header.EncodeBinary(bw) - } + bw.WriteArray(p.Hdrs) } From 085ca7b77021561b9e72aa0c837697dd471d8e60 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 8 Nov 2019 18:40:21 +0300 Subject: [PATCH 3/3] network: implement Consensus payloads --- pkg/consensus/cache.go | 60 ++++++++ pkg/consensus/cache_test.go | 60 ++++++++ pkg/consensus/change_view.go | 19 +++ pkg/consensus/commit.go | 22 +++ pkg/consensus/consensus.go | 34 +++++ pkg/consensus/payload.go | 155 +++++++++++++++++++++ pkg/consensus/payload_test.go | 220 ++++++++++++++++++++++++++++++ pkg/consensus/prepare_request.go | 43 ++++++ pkg/consensus/prepare_response.go | 21 +++ pkg/consensus/recovery_message.go | 132 ++++++++++++++++++ pkg/consensus/recovery_request.go | 18 +++ pkg/network/message.go | 4 +- pkg/network/server.go | 21 ++- 13 files changed, 806 insertions(+), 3 deletions(-) create mode 100644 pkg/consensus/cache.go create mode 100644 pkg/consensus/cache_test.go create mode 100644 pkg/consensus/change_view.go create mode 100644 pkg/consensus/commit.go create mode 100644 pkg/consensus/consensus.go create mode 100644 pkg/consensus/payload.go create mode 100644 pkg/consensus/payload_test.go create mode 100644 pkg/consensus/prepare_request.go create mode 100644 pkg/consensus/prepare_response.go create mode 100644 pkg/consensus/recovery_message.go create mode 100644 pkg/consensus/recovery_request.go diff --git a/pkg/consensus/cache.go b/pkg/consensus/cache.go new file mode 100644 index 000000000..39af2db31 --- /dev/null +++ b/pkg/consensus/cache.go @@ -0,0 +1,60 @@ +package consensus + +import ( + "container/list" + "sync" + + "github.com/CityOfZion/neo-go/pkg/util" +) + +// relayCache is a payload cache which is used to store +// last consensus payloads. +type relayCache struct { + *sync.RWMutex + + maxCap int + elems map[util.Uint256]*list.Element + queue *list.List +} + +func newFIFOCache(capacity int) *relayCache { + return &relayCache{ + RWMutex: new(sync.RWMutex), + + maxCap: capacity, + elems: make(map[util.Uint256]*list.Element), + queue: list.New(), + } +} + +// Add adds payload into a cache if it doesn't already exist. +func (c *relayCache) Add(p *Payload) { + c.Lock() + defer c.Unlock() + + h := p.Hash() + if c.elems[h] != nil { + return + } + + if c.queue.Len() >= c.maxCap { + first := c.queue.Front() + c.queue.Remove(first) + delete(c.elems, first.Value.(*Payload).Hash()) + } + + e := c.queue.PushBack(p) + c.elems[h] = e +} + +// Get returns payload with the specified hash from cache. +func (c *relayCache) Get(h util.Uint256) *Payload { + c.RLock() + defer c.RUnlock() + + e, ok := c.elems[h] + if !ok { + return nil + } + return e.Value.(*Payload) +} diff --git a/pkg/consensus/cache_test.go b/pkg/consensus/cache_test.go new file mode 100644 index 000000000..631cc70d9 --- /dev/null +++ b/pkg/consensus/cache_test.go @@ -0,0 +1,60 @@ +package consensus + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRelayCache_Add(t *testing.T) { + const capacity = 3 + payloads := getDifferentPayloads(t, capacity+1) + c := newFIFOCache(capacity) + require.Equal(t, 0, c.queue.Len()) + require.Equal(t, 0, len(c.elems)) + + for i := 1; i < capacity; i++ { + c.Add(&payloads[i]) + require.Equal(t, i, c.queue.Len()) + require.Equal(t, i, len(c.elems)) + } + + // add already existing payload + c.Add(&payloads[1]) + require.Equal(t, capacity-1, c.queue.Len()) + require.Equal(t, capacity-1, len(c.elems)) + + c.Add(&payloads[0]) + require.Equal(t, capacity, c.queue.Len()) + require.Equal(t, capacity, len(c.elems)) + + // capacity does not exceed maximum + c.Add(&payloads[capacity]) + require.Equal(t, capacity, c.queue.Len()) + require.Equal(t, capacity, len(c.elems)) + + // recent payloads are still present in cache + require.Equal(t, &payloads[0], c.Get(payloads[0].Hash())) + for i := 2; i <= capacity; i++ { + require.Equal(t, &payloads[i], c.Get(payloads[i].Hash())) + } + + // oldest payload was removed + require.Equal(t, (*Payload)(nil), c.Get(payloads[1].Hash())) +} + +func getDifferentPayloads(t *testing.T, n int) (payloads []Payload) { + payloads = make([]Payload, n) + for i := range payloads { + var sign [signatureSize]byte + fillRandom(t, sign[:]) + + payloads[i].ValidatorIndex = uint16(i) + payloads[i].Type = commitType + payloads[i].payload = &commit{ + Signature: sign, + } + } + + return +} diff --git a/pkg/consensus/change_view.go b/pkg/consensus/change_view.go new file mode 100644 index 000000000..4274a38d2 --- /dev/null +++ b/pkg/consensus/change_view.go @@ -0,0 +1,19 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/io" + +// changeView represents dBFT ChangeView message. +type changeView struct { + NewViewNumber byte + Timestamp uint32 +} + +// EncodeBinary implements io.Serializable interface. +func (c *changeView) EncodeBinary(w *io.BinWriter) { + w.WriteLE(c.Timestamp) +} + +// DecodeBinary implements io.Serializable interface. +func (c *changeView) DecodeBinary(r *io.BinReader) { + r.ReadLE(&c.Timestamp) +} diff --git a/pkg/consensus/commit.go b/pkg/consensus/commit.go new file mode 100644 index 000000000..2d3acac08 --- /dev/null +++ b/pkg/consensus/commit.go @@ -0,0 +1,22 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/io" + +// commit represents dBFT Commit message. +type commit struct { + Signature [signatureSize]byte +} + +// signatureSize is an rfc6989 signature size in bytes +// without leading byte (0x04, uncompressed) +const signatureSize = 64 + +// EncodeBinary implements io.Serializable interface. +func (c *commit) EncodeBinary(w *io.BinWriter) { + w.WriteBE(c.Signature) +} + +// DecodeBinary implements io.Serializable interface. +func (c *commit) DecodeBinary(r *io.BinReader) { + r.ReadBE(&c.Signature) +} diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go new file mode 100644 index 000000000..b76bd8b91 --- /dev/null +++ b/pkg/consensus/consensus.go @@ -0,0 +1,34 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/util" + +// cacheMaxCapacity is the default cache capacity taken +// from C# implementation https://github.com/neo-project/neo/blob/master/neo/Ledger/Blockchain.cs#L64 +const cacheMaxCapacity = 100 + +// Service represents consensus instance. +type Service interface { + OnPayload(p *Payload) + GetPayload(h util.Uint256) *Payload +} + +type service struct { + cache *relayCache +} + +// NewService returns new consensus.Service instance. +func NewService() Service { + return &service{ + cache: newFIFOCache(cacheMaxCapacity), + } +} + +// OnPayload handles Payload receive. +func (s *service) OnPayload(p *Payload) { + s.cache.Add(p) +} + +// GetPayload returns payload stored in cache. +func (s *service) GetPayload(h util.Uint256) *Payload { + return s.cache.Get(h) +} diff --git a/pkg/consensus/payload.go b/pkg/consensus/payload.go new file mode 100644 index 000000000..5abdef1aa --- /dev/null +++ b/pkg/consensus/payload.go @@ -0,0 +1,155 @@ +package consensus + +import ( + "fmt" + + "github.com/CityOfZion/neo-go/pkg/core/transaction" + "github.com/CityOfZion/neo-go/pkg/crypto/hash" + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" + "github.com/pkg/errors" +) + +type ( + messageType byte + + message struct { + Type messageType + ViewNumber byte + + payload io.Serializable + } + + // Payload is a type for consensus-related messages. + Payload struct { + message + + Version uint32 + ValidatorIndex uint16 + PrevHash util.Uint256 + Height uint32 + Timestamp uint32 + + Witness transaction.Witness + } +) + +const ( + changeViewType messageType = 0x00 + prepareRequestType messageType = 0x20 + prepareResponseType messageType = 0x21 + commitType messageType = 0x30 + recoveryRequestType messageType = 0x40 + recoveryMessageType messageType = 0x41 +) + +// EncodeBinaryUnsigned writes payload to w excluding signature. +func (p *Payload) EncodeBinaryUnsigned(w *io.BinWriter) { + w.WriteLE(p.Version) + w.WriteBE(p.PrevHash[:]) + w.WriteLE(p.Height) + w.WriteLE(p.ValidatorIndex) + w.WriteLE(p.Timestamp) + + ww := io.NewBufBinWriter() + p.message.EncodeBinary(ww.BinWriter) + w.WriteBytes(ww.Bytes()) +} + +// EncodeBinary implements io.Serializable interface. +func (p *Payload) EncodeBinary(w *io.BinWriter) { + p.EncodeBinaryUnsigned(w) + + w.WriteLE(byte(1)) + p.Witness.EncodeBinary(w) +} + +// DecodeBinaryUnsigned reads payload from w excluding signature. +func (p *Payload) DecodeBinaryUnsigned(r *io.BinReader) { + r.ReadLE(&p.Version) + r.ReadBE(p.PrevHash[:]) + r.ReadLE(&p.Height) + r.ReadLE(&p.ValidatorIndex) + r.ReadLE(&p.Timestamp) + + data := r.ReadBytes() + rr := io.NewBinReaderFromBuf(data) + p.message.DecodeBinary(rr) +} + +// Hash returns 32-byte message hash. +func (p *Payload) Hash() util.Uint256 { + w := io.NewBufBinWriter() + p.EncodeBinaryUnsigned(w.BinWriter) + + return hash.DoubleSha256(w.Bytes()) +} + +// DecodeBinary implements io.Serializable interface. +func (p *Payload) DecodeBinary(r *io.BinReader) { + p.DecodeBinaryUnsigned(r) + + var b byte + r.ReadLE(&b) + if b != 1 { + r.Err = errors.New("invalid format") + return + } + + p.Witness.DecodeBinary(r) +} + +// EncodeBinary implements io.Serializable interface. +func (m *message) EncodeBinary(w *io.BinWriter) { + w.WriteLE(byte(m.Type)) + w.WriteLE(m.ViewNumber) + m.payload.EncodeBinary(w) +} + +// DecodeBinary implements io.Serializable interface. +func (m *message) DecodeBinary(r *io.BinReader) { + r.ReadLE((*byte)(&m.Type)) + r.ReadLE(&m.ViewNumber) + + switch m.Type { + case changeViewType: + cv := new(changeView) + // NewViewNumber is not marshaled + cv.NewViewNumber = m.ViewNumber + 1 + m.payload = cv + case prepareRequestType: + m.payload = new(prepareRequest) + case prepareResponseType: + m.payload = new(prepareResponse) + case commitType: + m.payload = new(commit) + case recoveryRequestType: + m.payload = new(recoveryRequest) + case recoveryMessageType: + m.payload = new(recoveryMessage) + default: + r.Err = errors.Errorf("invalid type: 0x%02x", byte(m.Type)) + return + } + m.payload.DecodeBinary(r) +} + +// String implements fmt.Stringer interface. +func (t messageType) String() string { + switch t { + case changeViewType: + return "ChangeView" + case prepareRequestType: + return "PrepareRequest" + case prepareResponseType: + return "PrepareResponse" + case commitType: + return "Commit" + case recoveryRequestType: + return "RecoveryRequest" + case recoveryMessageType: + return "RecoveryMessage" + default: + return fmt.Sprintf("UNKNOWN(0x%02x)", byte(t)) + } +} diff --git a/pkg/consensus/payload_test.go b/pkg/consensus/payload_test.go new file mode 100644 index 000000000..8759ead89 --- /dev/null +++ b/pkg/consensus/payload_test.go @@ -0,0 +1,220 @@ +package consensus + +import ( + "encoding/hex" + gio "io" + "math/rand" + "testing" + "time" + + "github.com/CityOfZion/neo-go/pkg/core/transaction" + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" + "github.com/stretchr/testify/require" +) + +var messageTypes = []messageType{ + changeViewType, + prepareRequestType, + prepareResponseType, + commitType, + recoveryRequestType, + recoveryMessageType, +} + +func TestConsensusPayload_Hash(t *testing.T) { + dataHex := "00000000d8fb8d3b143b5f98468ef701909c976505a110a01e26c5e99be9a90cff979199b6fc33000400000000008d2000184dc95de24018f9ad71f4448a2b438eaca8b4b2ab6b4524b5a69a45d920c35103f3901444320656c390ff39c0062f5e8e138ce446a40c7e4ba1af1f8247ebbdf49295933715d3a67949714ff924f8a28cec5b954c71eca3bfaf0e9d4b1f87b4e21e9ba4ae18f97de71501b5c5d07edc200bd66a46b9b28b1a371f2195c10b0af90000e24018f900000000014140c9faaee59942f58da0e5268bc199632f2a3ad0fcbee68681a4437f140b49512e8d9efc6880eb44d3490782895a5794f35eeccee2923ce0c76fa7a1890f934eac232103c089d7122b840a4935234e82e26ae5efd0c2acb627239dc9f207311337b6f2c1ac" + data, err := hex.DecodeString(dataHex) + require.NoError(t, err) + + r := io.NewBinReaderFromBuf(data) + + var p Payload + p.DecodeBinary(r) + + require.NoError(t, err) + require.Equal(t, p.Hash().String(), "45859759c8491597804f1922773947e0d37bf54484af82f80cd642f7b063aa56") +} + +func TestConsensusPayload_Serializable(t *testing.T) { + for _, mt := range messageTypes { + p := randomPayload(t, mt) + testSerializable(t, p, new(Payload)) + } +} + +func TestCommit_Serializable(t *testing.T) { + c := randomMessage(t, commitType) + testSerializable(t, c, new(commit)) +} + +func TestPrepareResponse_Serializable(t *testing.T) { + resp := randomMessage(t, prepareResponseType) + testSerializable(t, resp, new(prepareResponse)) +} + +func TestPrepareRequest_Serializable(t *testing.T) { + req := randomMessage(t, prepareRequestType) + testSerializable(t, req, new(prepareRequest)) +} + +func TestRecoveryRequest_Serializable(t *testing.T) { + req := randomMessage(t, recoveryRequestType) + testSerializable(t, req, new(recoveryRequest)) +} + +func TestRecoveryMessage_Serializable(t *testing.T) { + msg := randomMessage(t, recoveryMessageType) + testSerializable(t, msg, new(recoveryMessage)) +} + +func randomPayload(t *testing.T, mt messageType) *Payload { + p := &Payload{ + message: message{ + Type: mt, + ViewNumber: byte(rand.Uint32()), + payload: randomMessage(t, mt), + }, + Version: 1, + ValidatorIndex: 13, + Height: rand.Uint32(), + Timestamp: rand.Uint32(), + Witness: transaction.Witness{ + InvocationScript: fillRandom(t, make([]byte, 3)), + VerificationScript: fillRandom(t, make([]byte, 4)), + }, + } + fillRandom(t, p.PrevHash[:]) + + if mt == changeViewType { + p.payload.(*changeView).NewViewNumber = p.ViewNumber + 1 + } + + return p +} + +func randomMessage(t *testing.T, mt messageType) io.Serializable { + switch mt { + case changeViewType: + return &changeView{ + Timestamp: rand.Uint32(), + } + case prepareRequestType: + return randomPrepareRequest(t) + case prepareResponseType: + resp := &prepareResponse{} + fillRandom(t, resp.PreparationHash[:]) + return resp + case commitType: + var c commit + fillRandom(t, c.Signature[:]) + return &c + case recoveryRequestType: + return &recoveryRequest{Timestamp: rand.Uint32()} + case recoveryMessageType: + return randomRecoveryMessage(t) + default: + require.Fail(t, "invalid type") + return nil + } +} + +func randomPrepareRequest(t *testing.T) *prepareRequest { + const txCount = 3 + + req := &prepareRequest{ + Timestamp: rand.Uint32(), + Nonce: rand.Uint64(), + TransactionHashes: make([]util.Uint256, txCount), + MinerTransaction: *newMinerTx(rand.Uint32()), + } + + req.TransactionHashes[0] = req.MinerTransaction.Hash() + for i := 1; i < txCount; i++ { + fillRandom(t, req.TransactionHashes[i][:]) + } + fillRandom(t, req.NextConsensus[:]) + + return req +} + +func randomRecoveryMessage(t *testing.T) *recoveryMessage { + result := randomMessage(t, prepareRequestType) + require.IsType(t, (*prepareRequest)(nil), result) + prepReq := result.(*prepareRequest) + + return &recoveryMessage{ + PreparationPayloads: []*preparationCompact{ + { + ValidatorIndex: 1, + InvocationScript: fillRandom(t, make([]byte, 10)), + }, + }, + CommitPayloads: []*commitCompact{ + { + ViewNumber: 0, + ValidatorIndex: 1, + Signature: fillRandom(t, make([]byte, signatureSize)), + InvocationScript: fillRandom(t, make([]byte, 20)), + }, + { + ViewNumber: 0, + ValidatorIndex: 2, + Signature: fillRandom(t, make([]byte, signatureSize)), + InvocationScript: fillRandom(t, make([]byte, 10)), + }, + }, + ChangeViewPayloads: []*changeViewCompact{ + { + Timestamp: rand.Uint32(), + ValidatorIndex: 3, + OriginalViewNumber: 3, + InvocationScript: fillRandom(t, make([]byte, 4)), + }, + }, + PrepareRequest: prepReq, + } +} + +func TestMessageType_String(t *testing.T) { + require.Equal(t, "ChangeView", changeViewType.String()) + require.Equal(t, "PrepareRequest", prepareRequestType.String()) + require.Equal(t, "PrepareResponse", prepareResponseType.String()) + require.Equal(t, "Commit", commitType.String()) + require.Equal(t, "RecoveryMessage", recoveryMessageType.String()) + require.Equal(t, "RecoveryRequest", recoveryRequestType.String()) + require.Equal(t, "UNKNOWN(0xff)", messageType(0xff).String()) +} + +func testSerializable(t *testing.T, expected, actual io.Serializable) { + w := io.NewBufBinWriter() + expected.EncodeBinary(w.BinWriter) + + r := io.NewBinReaderFromBuf(w.Bytes()) + actual.DecodeBinary(r) + + require.Equal(t, expected, actual) +} + +func fillRandom(t *testing.T, buf []byte) []byte { + r := rand.New(rand.NewSource(time.Now().Unix())) + _, err := gio.ReadFull(r, buf) + require.NoError(t, err) + + return buf +} + +func newMinerTx(nonce uint32) *transaction.Transaction { + return &transaction.Transaction{ + Type: transaction.MinerType, + Version: 0, + Data: &transaction.MinerTX{ + Nonce: rand.Uint32(), + }, + Attributes: []*transaction.Attribute{}, + Inputs: []*transaction.Input{}, + Outputs: []*transaction.Output{}, + Scripts: []*transaction.Witness{}, + Trimmed: false, + } +} diff --git a/pkg/consensus/prepare_request.go b/pkg/consensus/prepare_request.go new file mode 100644 index 000000000..02c374747 --- /dev/null +++ b/pkg/consensus/prepare_request.go @@ -0,0 +1,43 @@ +package consensus + +import ( + "github.com/CityOfZion/neo-go/pkg/core/transaction" + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" +) + +// prepareRequest represents dBFT PrepareRequest message. +type prepareRequest struct { + Timestamp uint32 + Nonce uint64 + TransactionHashes []util.Uint256 + MinerTransaction transaction.Transaction + NextConsensus util.Uint160 +} + +// EncodeBinary implements io.Serializable interface. +func (p *prepareRequest) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.Timestamp) + w.WriteLE(p.Nonce) + w.WriteBE(p.NextConsensus[:]) + w.WriteVarUint(uint64(len(p.TransactionHashes))) + for i := range p.TransactionHashes { + w.WriteBE(p.TransactionHashes[i][:]) + } + p.MinerTransaction.EncodeBinary(w) +} + +// DecodeBinary implements io.Serializable interface. +func (p *prepareRequest) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.Timestamp) + r.ReadLE(&p.Nonce) + r.ReadBE(p.NextConsensus[:]) + + lenHashes := r.ReadVarUint() + p.TransactionHashes = make([]util.Uint256, lenHashes) + for i := range p.TransactionHashes { + r.ReadBE(p.TransactionHashes[i][:]) + } + + p.MinerTransaction.DecodeBinary(r) +} diff --git a/pkg/consensus/prepare_response.go b/pkg/consensus/prepare_response.go new file mode 100644 index 000000000..4636b3139 --- /dev/null +++ b/pkg/consensus/prepare_response.go @@ -0,0 +1,21 @@ +package consensus + +import ( + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" +) + +// prepareResponse represents dBFT PrepareResponse message. +type prepareResponse struct { + PreparationHash util.Uint256 +} + +// EncodeBinary implements io.Serializable interface. +func (p *prepareResponse) EncodeBinary(w *io.BinWriter) { + w.WriteBE(p.PreparationHash[:]) +} + +// DecodeBinary implements io.Serializable interface. +func (p *prepareResponse) DecodeBinary(r *io.BinReader) { + r.ReadBE(p.PreparationHash[:]) +} diff --git a/pkg/consensus/recovery_message.go b/pkg/consensus/recovery_message.go new file mode 100644 index 000000000..8b248b271 --- /dev/null +++ b/pkg/consensus/recovery_message.go @@ -0,0 +1,132 @@ +package consensus + +import ( + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/util" + "github.com/pkg/errors" +) + +type ( + // recoveryMessage represents dBFT Recovery message. + recoveryMessage struct { + PreparationHash *util.Uint256 + PreparationPayloads []*preparationCompact + CommitPayloads []*commitCompact + ChangeViewPayloads []*changeViewCompact + PrepareRequest *prepareRequest + } + + changeViewCompact struct { + ValidatorIndex uint16 + OriginalViewNumber byte + Timestamp uint32 + InvocationScript []byte + } + + commitCompact struct { + ViewNumber byte + ValidatorIndex uint16 + Signature []byte + InvocationScript []byte + } + + preparationCompact struct { + ValidatorIndex uint16 + InvocationScript []byte + } +) + +const uint256size = 32 + +// DecodeBinary implements io.Serializable interface. +func (m *recoveryMessage) DecodeBinary(r *io.BinReader) { + m.ChangeViewPayloads = r.ReadArray(changeViewCompact{}).([]*changeViewCompact) + + var hasReq bool + r.ReadLE(&hasReq) + if hasReq { + m.PrepareRequest = new(prepareRequest) + m.PrepareRequest.DecodeBinary(r) + } else { + l := r.ReadVarUint() + if l != 0 { + if l == uint256size { + m.PreparationHash = new(util.Uint256) + r.ReadBE(m.PreparationHash[:]) + } else { + r.Err = errors.New("invalid data") + } + } else { + m.PreparationHash = nil + } + } + + m.PreparationPayloads = r.ReadArray(preparationCompact{}).([]*preparationCompact) + m.CommitPayloads = r.ReadArray(commitCompact{}).([]*commitCompact) +} + +// EncodeBinary implements io.Serializable interface. +func (m *recoveryMessage) EncodeBinary(w *io.BinWriter) { + w.WriteArray(m.ChangeViewPayloads) + + hasReq := m.PrepareRequest != nil + w.WriteLE(hasReq) + if hasReq { + m.PrepareRequest.EncodeBinary(w) + } else { + if m.PreparationHash == nil { + w.WriteVarUint(0) + } else { + w.WriteVarUint(uint256size) + w.WriteBE(m.PreparationHash[:]) + } + } + w.WriteArray(m.PreparationPayloads) + w.WriteArray(m.CommitPayloads) +} + +// DecodeBinary implements io.Serializable interface. +func (p *changeViewCompact) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.ValidatorIndex) + r.ReadLE(&p.OriginalViewNumber) + r.ReadLE(&p.Timestamp) + p.InvocationScript = r.ReadBytes() +} + +// EncodeBinary implements io.Serializable interface. +func (p *changeViewCompact) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.ValidatorIndex) + w.WriteLE(p.OriginalViewNumber) + w.WriteLE(p.Timestamp) + w.WriteBytes(p.InvocationScript) +} + +// DecodeBinary implements io.Serializable interface. +func (p *commitCompact) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.ViewNumber) + r.ReadLE(&p.ValidatorIndex) + + p.Signature = make([]byte, signatureSize) + r.ReadBE(&p.Signature) + p.InvocationScript = r.ReadBytes() +} + +// EncodeBinary implements io.Serializable interface. +func (p *commitCompact) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.ViewNumber) + w.WriteLE(p.ValidatorIndex) + w.WriteBE(p.Signature) + w.WriteBytes(p.InvocationScript) +} + +// DecodeBinary implements io.Serializable interface. +func (p *preparationCompact) DecodeBinary(r *io.BinReader) { + r.ReadLE(&p.ValidatorIndex) + p.InvocationScript = r.ReadBytes() +} + +// EncodeBinary implements io.Serializable interface. +func (p *preparationCompact) EncodeBinary(w *io.BinWriter) { + w.WriteLE(p.ValidatorIndex) + w.WriteBytes(p.InvocationScript) +} diff --git a/pkg/consensus/recovery_request.go b/pkg/consensus/recovery_request.go new file mode 100644 index 000000000..72c74b9ee --- /dev/null +++ b/pkg/consensus/recovery_request.go @@ -0,0 +1,18 @@ +package consensus + +import "github.com/CityOfZion/neo-go/pkg/io" + +// recoveryRequest represents dBFT RecoveryRequest message. +type recoveryRequest struct { + Timestamp uint32 +} + +// DecodeBinary implements io.Serializable interface. +func (m *recoveryRequest) DecodeBinary(r *io.BinReader) { + r.ReadLE(&m.Timestamp) +} + +// EncodeBinary implements io.Serializable interface. +func (m *recoveryRequest) EncodeBinary(w *io.BinWriter) { + w.WriteLE(m.Timestamp) +} diff --git a/pkg/network/message.go b/pkg/network/message.go index 8ea89ae4d..2807e22de 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/CityOfZion/neo-go/config" + "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/crypto/hash" @@ -185,8 +186,7 @@ func (m *Message) decodePayload(br *io.BinReader) error { case CMDBlock: p = &core.Block{} case CMDConsensus: - // Stubbed out for now, see #431. - return nil + p = &consensus.Payload{} case CMDGetBlocks: fallthrough case CMDGetHeaders: diff --git a/pkg/network/server.go b/pkg/network/server.go index be9cb7971..79027a3c1 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" @@ -50,6 +51,7 @@ type ( discovery Discoverer chain core.Blockchainer bQueue *blockQueue + consensus consensus.Service lock sync.RWMutex peers map[Peer]bool @@ -78,6 +80,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), + consensus: consensus.NewService(), } if s.MinPeers <= 0 { @@ -366,11 +369,24 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { } } case payload.ConsensusType: - // TODO (#431) + for _, hash := range inv.Hashes { + if cp := s.consensus.GetPayload(hash); cp != nil { + if err := p.WriteMsg(NewMessage(s.Net, CMDConsensus, cp)); err != nil { + return err + } + } + } } return nil } +// handleConsensusCmd processes received consensus payload. +// It never returns an error. +func (s *Server) handleConsensusCmd(cp *consensus.Payload) error { + s.consensus.OnPayload(cp) + return nil +} + // handleAddrCmd will process received addresses. func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { for _, a := range addrs.Addrs { @@ -459,6 +475,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { case CMDBlock: block := msg.Payload.(*core.Block) return s.handleBlockCmd(peer, block) + case CMDConsensus: + cp := msg.Payload.(*consensus.Payload) + return s.handleConsensusCmd(cp) case CMDVersion, CMDVerack: return fmt.Errorf("received '%s' after the handshake", msg.CommandType()) }