consensus: added partial message decoding

closes #849
This commit is contained in:
Anna Shaleva 2020-04-15 18:56:45 +03:00
parent 1e3c36433f
commit 9dd5ab5e2b
7 changed files with 63 additions and 17 deletions

View file

@ -52,6 +52,7 @@ func getDifferentPayloads(t *testing.T, n int) (payloads []Payload) {
var sign [signatureSize]byte var sign [signatureSize]byte
random.Fill(sign[:]) random.Fill(sign[:])
payloads[i].message = &message{}
payloads[i].SetValidatorIndex(uint16(i)) payloads[i].SetValidatorIndex(uint16(i))
payloads[i].SetType(payload.MessageType(commitType)) payloads[i].SetType(payload.MessageType(commitType))
payloads[i].payload = &commit{ payloads[i].payload = &commit{

View file

@ -140,7 +140,7 @@ func NewService(cfg Config) (Service, error) {
dbft.WithGetValidators(srv.getValidators), dbft.WithGetValidators(srv.getValidators),
dbft.WithGetConsensusAddress(srv.getConsensusAddress), dbft.WithGetConsensusAddress(srv.getConsensusAddress),
dbft.WithNewConsensusPayload(func() payload.ConsensusPayload { return new(Payload) }), dbft.WithNewConsensusPayload(func() payload.ConsensusPayload { p := new(Payload); p.message = &message{}; return p }),
dbft.WithNewPrepareRequest(func() payload.PrepareRequest { return new(prepareRequest) }), dbft.WithNewPrepareRequest(func() payload.PrepareRequest { return new(prepareRequest) }),
dbft.WithNewPrepareResponse(func() payload.PrepareResponse { return new(prepareResponse) }), dbft.WithNewPrepareResponse(func() payload.PrepareResponse { return new(prepareResponse) }),
dbft.WithNewChangeView(func() payload.ChangeView { return new(changeView) }), dbft.WithNewChangeView(func() payload.ChangeView { return new(changeView) }),
@ -245,7 +245,7 @@ func (s *service) getKeyPair(pubs []crypto.PublicKey) (int, crypto.PrivateKey, c
// OnPayload handles Payload receive. // OnPayload handles Payload receive.
func (s *service) OnPayload(cp *Payload) { func (s *service) OnPayload(cp *Payload) {
log := s.log.With(zap.Stringer("hash", cp.Hash()), zap.Stringer("type", cp.Type())) log := s.log.With(zap.Stringer("hash", cp.Hash()))
if !s.validatePayload(cp) { if !s.validatePayload(cp) {
log.Debug("can't validate payload") log.Debug("can't validate payload")
return return
@ -262,6 +262,14 @@ func (s *service) OnPayload(cp *Payload) {
return return
} }
// decode payload data into message
if cp.message == nil {
if err := cp.decodeData(); err != nil {
log.Debug("can't decode payload data")
return
}
}
// we use switch here because other payloads could be possibly added in future // we use switch here because other payloads could be possibly added in future
switch cp.Type() { switch cp.Type() {
case payload.PrepareRequestType: case payload.PrepareRequestType:

View file

@ -41,6 +41,7 @@ func TestService_GetVerified(t *testing.T) {
hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()}
p := new(Payload) p := new(Payload)
p.message = &message{}
p.SetType(payload.PrepareRequestType) p.SetType(payload.PrepareRequestType)
p.SetPayload(&prepareRequest{transactionHashes: hashes, minerTx: *transaction.NewMinerTXWithNonce(999)}) p.SetPayload(&prepareRequest{transactionHashes: hashes, minerTx: *transaction.NewMinerTXWithNonce(999)})
p.SetValidatorIndex(1) p.SetValidatorIndex(1)
@ -76,6 +77,7 @@ func TestService_ValidatePayload(t *testing.T) {
srv := newTestService(t) srv := newTestService(t)
priv, _ := getTestValidator(1) priv, _ := getTestValidator(1)
p := new(Payload) p := new(Payload)
p.message = &message{}
p.SetPayload(&prepareRequest{}) p.SetPayload(&prepareRequest{})
@ -138,6 +140,7 @@ func TestService_OnPayload(t *testing.T) {
priv, _ := getTestValidator(1) priv, _ := getTestValidator(1)
p := new(Payload) p := new(Payload)
p.message = &message{}
p.SetValidatorIndex(1) p.SetValidatorIndex(1)
p.SetPayload(&prepareRequest{}) p.SetPayload(&prepareRequest{})

View file

@ -27,8 +27,9 @@ type (
// Payload is a type for consensus-related messages. // Payload is a type for consensus-related messages.
Payload struct { Payload struct {
message *message
data []byte
version uint32 version uint32
validatorIndex uint16 validatorIndex uint16
prevHash util.Uint256 prevHash util.Uint256
@ -168,9 +169,12 @@ func (p *Payload) EncodeBinaryUnsigned(w *io.BinWriter) {
w.WriteU16LE(p.validatorIndex) w.WriteU16LE(p.validatorIndex)
w.WriteU32LE(p.timestamp) w.WriteU32LE(p.timestamp)
if p.message != nil {
ww := io.NewBufBinWriter() ww := io.NewBufBinWriter()
p.message.EncodeBinary(ww.BinWriter) p.message.EncodeBinary(ww.BinWriter)
w.WriteVarBytes(ww.Bytes()) p.data = ww.Bytes()
}
w.WriteVarBytes(p.data)
} }
// EncodeBinary implements io.Serializable interface. // EncodeBinary implements io.Serializable interface.
@ -227,14 +231,10 @@ func (p *Payload) DecodeBinaryUnsigned(r *io.BinReader) {
p.validatorIndex = r.ReadU16LE() p.validatorIndex = r.ReadU16LE()
p.timestamp = r.ReadU32LE() p.timestamp = r.ReadU32LE()
data := r.ReadVarBytes() p.data = r.ReadVarBytes()
if r.Err != nil { if r.Err != nil {
return return
} }
rr := io.NewBinReaderFromBuf(data)
p.message.DecodeBinary(rr)
r.Err = rr.Err
} }
// Hash implements payload.ConsensusPayload interface. // Hash implements payload.ConsensusPayload interface.
@ -318,3 +318,15 @@ func (t messageType) String() string {
return fmt.Sprintf("UNKNOWN(0x%02x)", byte(t)) return fmt.Sprintf("UNKNOWN(0x%02x)", byte(t))
} }
} }
// decode data of payload into it's message
func (p *Payload) decodeData() error {
m := new(message)
br := io.NewBinReaderFromBuf(p.data)
m.DecodeBinary(br)
if br.Err != nil {
return errors.Wrap(br.Err, "cannot decode data into message")
}
p.message = m
return nil
}

View file

@ -28,6 +28,7 @@ var messageTypes = []messageType{
func TestConsensusPayload_Setters(t *testing.T) { func TestConsensusPayload_Setters(t *testing.T) {
var p Payload var p Payload
p.message = &message{}
p.SetVersion(1) p.SetVersion(1)
assert.EqualValues(t, 1, p.Version()) assert.EqualValues(t, 1, p.Version())
@ -86,11 +87,20 @@ func TestConsensusPayload_Hash(t *testing.T) {
func TestConsensusPayload_Serializable(t *testing.T) { func TestConsensusPayload_Serializable(t *testing.T) {
for _, mt := range messageTypes { for _, mt := range messageTypes {
p := randomPayload(t, mt) p := randomPayload(t, mt)
testserdes.EncodeDecodeBinary(t, p, new(Payload)) actual := new(Payload)
data, err := testserdes.EncodeBinary(p)
require.NoError(t, err)
require.NoError(t, testserdes.DecodeBinary(data, actual))
// message is nil after decoding as we didn't yet call decodeData
require.Nil(t, actual.message)
// message should now be decoded from actual.data byte array
assert.NoError(t, actual.decodeData())
require.Equal(t, p, actual)
data := p.MarshalUnsigned() data = p.MarshalUnsigned()
pu := new(Payload) pu := new(Payload)
require.NoError(t, pu.UnmarshalUnsigned(data)) require.NoError(t, pu.UnmarshalUnsigned(data))
assert.NoError(t, pu.decodeData())
p.Witness = transaction.Witness{} p.Witness = transaction.Witness{}
require.Equal(t, p, pu) require.Equal(t, p, pu)
@ -115,7 +125,7 @@ func TestConsensusPayload_DecodeBinaryInvalid(t *testing.T) {
buf := make([]byte, 46+1+34+1+2) buf := make([]byte, 46+1+34+1+2)
expected := &Payload{ expected := &Payload{
message: message{ message: &message{
Type: prepareResponseType, Type: prepareResponseType,
payload: &prepareResponse{}, payload: &prepareResponse{},
}, },
@ -124,6 +134,8 @@ func TestConsensusPayload_DecodeBinaryInvalid(t *testing.T) {
VerificationScript: []byte{}, VerificationScript: []byte{},
}, },
} }
// fill `data` for next check
_ = expected.Hash()
// valid payload // valid payload
buf[delimeterIndex] = 1 buf[delimeterIndex] = 1
@ -131,11 +143,15 @@ func TestConsensusPayload_DecodeBinaryInvalid(t *testing.T) {
buf[typeIndex] = byte(prepareResponseType) buf[typeIndex] = byte(prepareResponseType)
p := new(Payload) p := new(Payload)
require.NoError(t, testserdes.DecodeBinary(buf, p)) require.NoError(t, testserdes.DecodeBinary(buf, p))
// decode `data` into `message`
assert.NoError(t, p.decodeData())
require.Equal(t, expected, p) require.Equal(t, expected, p)
// invalid type // invalid type
buf[typeIndex] = 0xFF buf[typeIndex] = 0xFF
require.Error(t, testserdes.DecodeBinary(buf, new(Payload))) actual := new(Payload)
require.NoError(t, testserdes.DecodeBinary(buf, actual))
require.Error(t, actual.decodeData())
// invalid format // invalid format
buf[delimeterIndex] = 0 buf[delimeterIndex] = 0
@ -176,7 +192,7 @@ func TestRecoveryMessage_Serializable(t *testing.T) {
func randomPayload(t *testing.T, mt messageType) *Payload { func randomPayload(t *testing.T, mt messageType) *Payload {
p := &Payload{ p := &Payload{
message: message{ message: &message{
Type: mt, Type: mt,
ViewNumber: byte(rand.Uint32()), ViewNumber: byte(rand.Uint32()),
payload: randomMessage(t, mt), payload: randomMessage(t, mt),

View file

@ -285,7 +285,7 @@ func getVerificationScript(i uint16, validators []crypto.PublicKey) []byte {
func fromPayload(t messageType, recovery *Payload, p io.Serializable) *Payload { func fromPayload(t messageType, recovery *Payload, p io.Serializable) *Payload {
return &Payload{ return &Payload{
message: message{ message: &message{
Type: t, Type: t,
ViewNumber: recovery.message.ViewNumber, ViewNumber: recovery.message.ViewNumber,
payload: p, payload: p,

View file

@ -25,6 +25,7 @@ func TestRecoveryMessage_Setters(t *testing.T) {
r := &recoveryMessage{} r := &recoveryMessage{}
p := new(Payload) p := new(Payload)
p.message = &message{}
p.SetType(payload.RecoveryMessageType) p.SetType(payload.RecoveryMessageType)
p.SetPayload(r) p.SetPayload(r)
// sign payload to have verification script // sign payload to have verification script
@ -38,6 +39,7 @@ func TestRecoveryMessage_Setters(t *testing.T) {
nextConsensus: util.Uint160{1, 2}, nextConsensus: util.Uint160{1, 2},
} }
p1 := new(Payload) p1 := new(Payload)
p1.message = &message{}
p1.SetType(payload.PrepareRequestType) p1.SetType(payload.PrepareRequestType)
p1.SetPayload(req) p1.SetPayload(req)
p1.SetValidatorIndex(0) p1.SetValidatorIndex(0)
@ -45,6 +47,7 @@ func TestRecoveryMessage_Setters(t *testing.T) {
t.Run("prepare response is added", func(t *testing.T) { t.Run("prepare response is added", func(t *testing.T) {
p2 := new(Payload) p2 := new(Payload)
p2.message = &message{}
p2.SetType(payload.PrepareResponseType) p2.SetType(payload.PrepareResponseType)
p2.SetPayload(&prepareResponse{ p2.SetPayload(&prepareResponse{
preparationHash: p1.Hash(), preparationHash: p1.Hash(),
@ -70,6 +73,7 @@ func TestRecoveryMessage_Setters(t *testing.T) {
r.AddPayload(p1) r.AddPayload(p1)
pr = r.GetPrepareRequest(p, pubs, p1.ValidatorIndex()) pr = r.GetPrepareRequest(p, pubs, p1.ValidatorIndex())
require.NotNil(t, pr) require.NotNil(t, pr)
require.Equal(t, p1.Hash(), pr.Hash())
require.Equal(t, p1, pr) require.Equal(t, p1, pr)
pl := pr.(*Payload) pl := pr.(*Payload)
@ -78,6 +82,7 @@ func TestRecoveryMessage_Setters(t *testing.T) {
t.Run("change view is added", func(t *testing.T) { t.Run("change view is added", func(t *testing.T) {
p3 := new(Payload) p3 := new(Payload)
p3.message = &message{}
p3.SetType(payload.ChangeViewType) p3.SetType(payload.ChangeViewType)
p3.SetPayload(&changeView{ p3.SetPayload(&changeView{
newViewNumber: 1, newViewNumber: 1,
@ -98,6 +103,7 @@ func TestRecoveryMessage_Setters(t *testing.T) {
t.Run("commit is added", func(t *testing.T) { t.Run("commit is added", func(t *testing.T) {
p4 := new(Payload) p4 := new(Payload)
p4.message = &message{}
p4.SetType(payload.CommitType) p4.SetType(payload.CommitType)
p4.SetPayload(randomMessage(t, commitType)) p4.SetPayload(randomMessage(t, commitType))
p4.SetValidatorIndex(4) p4.SetValidatorIndex(4)