From b2021c126ecdd9a5a3cae87d17a81fae6ac1701a Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Mon, 9 Apr 2018 18:58:09 +0200 Subject: [PATCH] Tweaks for network and storage (#66) * Made Encode/Decode message public. * Added Redis storage driver and made some optimizations for the initialising the blockchain * removed log lines in tcp_peer * Added missing comments on exported methods. * bumped version --- Gopkg.lock | 37 +++++------- Gopkg.toml | 4 ++ VERSION | 2 +- pkg/core/blockchain.go | 44 +++++++------- pkg/core/storage/helpers.go | 78 +++++++++++++++++++++++++ pkg/core/storage/redis_store.go | 89 +++++++++++++++++++++++++++++ pkg/core/storage/store.go | 2 +- pkg/core/transaction/transaction.go | 2 + pkg/core/util.go | 36 ------------ pkg/network/message.go | 8 +-- pkg/network/server.go | 7 ++- pkg/network/tcp_peer.go | 4 +- 12 files changed, 220 insertions(+), 93 deletions(-) create mode 100644 pkg/core/storage/helpers.go create mode 100644 pkg/core/storage/redis_store.go diff --git a/Gopkg.lock b/Gopkg.lock index 392aec2e9..4d8d5751e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -20,22 +20,19 @@ version = "v1.1.0" [[projects]] - name = "github.com/go-kit/kit" - packages = ["log"] - revision = "4dc7be5d2d12881735283bcab7352178e190fc71" - version = "v0.6.0" - -[[projects]] - name = "github.com/go-logfmt/logfmt" - packages = ["."] - revision = "390ab7935ee28ec6b286364bba9b4dd6410cb3d5" - version = "v0.3.0" - -[[projects]] - name = "github.com/go-stack/stack" - packages = ["."] - revision = "259ab82a6cad3992b4e21ff5cac294ccb06474bc" - version = "v1.7.0" + name = "github.com/go-redis/redis" + packages = [ + ".", + "internal", + "internal/consistenthash", + "internal/hashtag", + "internal/pool", + "internal/proto", + "internal/singleflight", + "internal/util" + ] + revision = "877867d2845fbaf86798befe410b6ceb6f5c29a3" + version = "v6.10.2" [[projects]] name = "github.com/go-yaml/yaml" @@ -49,12 +46,6 @@ packages = ["."] revision = "553a641470496b2327abcac10b36396bd98e45c9" -[[projects]] - branch = "master" - name = "github.com/kr/logfmt" - packages = ["."] - revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" - [[projects]] name = "github.com/pkg/errors" packages = ["."] @@ -150,6 +141,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "d4338e14e8103a6626ecf662f3d0c08e972a39e667a6c76f31cc8938f59f2cba" + inputs-digest = "c0527327199b5752699bd5cd0959e1f2cc45dd7c0341adc2a8327eaca246eef8" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 0c6af0f5e..b8faa8f09 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -63,3 +63,7 @@ [[constraint]] name = "github.com/go-yaml/yaml" version = "2.1.1" + +[[constraint]] + name = "github.com/go-redis/redis" + version = "6.10.2" diff --git a/VERSION b/VERSION index d2e2400ee..e06193879 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.39.1 +0.39.2 diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f91dde66c..1b61cf5d1 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -85,29 +85,29 @@ func (bc *Blockchain) init() error { } bc.headerList = NewHeaderHashList(genesisBlock.Hash()) - // Look in the storage for a version. If we could not the version key - // there is nothing stored. - if version, err := bc.Get(storage.SYSVersion.Bytes()); err != nil { - bc.Put(storage.SYSVersion.Bytes(), []byte(version)) - if err := bc.persistBlock(genesisBlock); err != nil { - return err - } - - return nil + // If we could not find the version in the Store, we know that there is nothing stored. + ver, err := storage.Version(bc.Store) + if err != nil { + log.Infof("no storage version found! creating genesis block") + storage.PutVersion(bc.Store, version) + return bc.persistBlock(genesisBlock) + } + if ver != version { + return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver) } // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified // and the genesis block as first block. - log.Infof("restoring blockchain with storage version: %s", version) + log.Infof("restoring blockchain with version: %s", version) - currBlockBytes, err := bc.Get(storage.SYSCurrentBlock.Bytes()) + bHeight, err := storage.CurrentBlockHeight(bc.Store) if err != nil { return err } + bc.blockHeight = bHeight - bc.blockHeight = binary.LittleEndian.Uint32(currBlockBytes[32:36]) - hashes, err := readStoredHeaderHashes(bc.Store) + hashes, err := storage.HeaderHashes(bc.Store) if err != nil { return err } @@ -119,23 +119,18 @@ func (bc *Blockchain) init() error { } } - currHeaderBytes, err := bc.Get(storage.SYSCurrentHeader.Bytes()) - if err != nil { - return err - } - currHeaderHeight := binary.LittleEndian.Uint32(currHeaderBytes[32:36]) - currHeaderHash, err := util.Uint256DecodeBytes(currHeaderBytes[:32]) + currHeaderHeight, currHeaderHash, err := storage.CurrentHeaderHeight(bc.Store) if err != nil { return err } - // Their is a high chance that the Node is stopped before the next + // There is a high chance that the Node is stopped before the next // batch of 2000 headers was stored. Via the currentHeaders stored we can sync // that with stored blocks. if currHeaderHeight > bc.storedHeaderCount { hash := currHeaderHash targetHash := bc.headerList.Get(bc.headerList.Len() - 1) - headers := []*Header{} + headers := make([]*Header, 0) for hash != targetHash { header, err := bc.getHeader(hash) @@ -392,9 +387,10 @@ func (bc *Blockchain) persist() (err error) { if persisted > 0 { log.WithFields(log.Fields{ - "persisted": persisted, - "blockHeight": bc.BlockHeight(), - "took": time.Since(start), + "persisted": persisted, + "headerHeight": bc.HeaderHeight(), + "blockHeight": bc.BlockHeight(), + "took": time.Since(start), }).Info("blockchain persist completed") } diff --git a/pkg/core/storage/helpers.go b/pkg/core/storage/helpers.go new file mode 100644 index 000000000..5ae271262 --- /dev/null +++ b/pkg/core/storage/helpers.go @@ -0,0 +1,78 @@ +package storage + +import ( + "encoding/binary" + "sort" + + "github.com/CityOfZion/neo-go/pkg/util" +) + +// Version will attempt to get the current version stored in the +// underlying Store. +func Version(s Store) (string, error) { + version, err := s.Get(SYSVersion.Bytes()) + return string(version), err +} + +// PutVersion will store the given version in the underlying Store. +func PutVersion(s Store, v string) error { + return s.Put(SYSVersion.Bytes(), []byte(v)) +} + +// CurrentBlockHeight returns the current block height found in the +// underlying Store. +func CurrentBlockHeight(s Store) (uint32, error) { + b, err := s.Get(SYSCurrentBlock.Bytes()) + if err != nil { + return 0, err + } + return binary.LittleEndian.Uint32(b[32:36]), nil +} + +// CurrentHeaderHeight returns the current header height and hash from +// the underlying Store. +func CurrentHeaderHeight(s Store) (i uint32, h util.Uint256, err error) { + var b []byte + b, err = s.Get(SYSCurrentHeader.Bytes()) + if err != nil { + return + } + i = binary.LittleEndian.Uint32(b[32:36]) + h, err = util.Uint256DecodeBytes(b[:32]) + return +} + +// HeaderHashes returns a sorted list of header hashes retrieved from +// the given underlying Store. +func HeaderHashes(s Store) ([]util.Uint256, error) { + hashMap := make(map[uint32][]util.Uint256) + s.Seek(IXHeaderHashList.Bytes(), func(k, v []byte) { + storedCount := binary.LittleEndian.Uint32(k[1:]) + hashes, err := util.Read2000Uint256Hashes(v) + if err != nil { + panic(err) + } + hashMap[storedCount] = hashes + }) + + var ( + i = 0 + sortedKeys = make([]int, len(hashMap)) + ) + + for k, _ := range hashMap { + sortedKeys[i] = int(k) + i++ + } + sort.Ints(sortedKeys) + + hashes := []util.Uint256{} + for _, key := range sortedKeys { + values := hashMap[uint32(key)] + for _, hash := range values { + hashes = append(hashes, hash) + } + } + + return hashes, nil +} diff --git a/pkg/core/storage/redis_store.go b/pkg/core/storage/redis_store.go new file mode 100644 index 000000000..0822bd063 --- /dev/null +++ b/pkg/core/storage/redis_store.go @@ -0,0 +1,89 @@ +package storage + +import ( + "fmt" + + "github.com/go-redis/redis" +) + +// RedisStore holds the client and maybe later some more metadata. +type RedisStore struct { + client *redis.Client +} + +// RedisBatch simple batch implementation to satisfy the Store interface. +type RedisBatch struct { + mem map[string]string +} + +// Len implements the Batch interface. +func (b *RedisBatch) Len() int { + return len(b.mem) +} + +// Put implements the Batch interface. +func (b *RedisBatch) Put(k, v []byte) { + b.mem[string(k)] = string(v) +} + +// NewRedisBatch returns a new ready to use RedisBatch. +func NewRedisBatch() *RedisBatch { + return &RedisBatch{ + mem: make(map[string]string), + } +} + +// NewRedisStore returns an new initialized - ready to use RedisStore object +func NewRedisStore() (*RedisStore, error) { + c := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + if _, err := c.Ping().Result(); err != nil { + return nil, err + } + return &RedisStore{ + client: c, + }, nil +} + +// Batch implements the Store interface. +func (s *RedisStore) Batch() Batch { + return NewRedisBatch() +} + +// Get implements the Store interface. +func (s *RedisStore) Get(k []byte) ([]byte, error) { + val, err := s.client.Get(string(k)).Result() + if err != nil { + return nil, err + } + return []byte(val), nil +} + +// Put implements the Store interface. +func (s *RedisStore) Put(k, v []byte) error { + s.client.Set(string(k), string(v), 0) + return nil +} + +// PutBatch implements the Store interface. +func (s *RedisStore) PutBatch(b Batch) error { + pipe := s.client.Pipeline() + for k, v := range b.(*RedisBatch).mem { + pipe.Set(k, v, 0) + } + _, err := pipe.Exec() + return err +} + +// Seek implements the Store interface. +func (s *RedisStore) Seek(k []byte, f func(k, v []byte)) { + iter := s.client.Scan(0, fmt.Sprintf("%s*", k), 0).Iterator() + for iter.Next() { + key := iter.Val() + val, _ := s.client.Get(key).Result() + f([]byte(key), []byte(val)) + } +} diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 2d244fd9d..3d63fbb47 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -34,7 +34,7 @@ type ( Batch() Batch Get([]byte) ([]byte, error) Put(k, v []byte) error - PutBatch(batch Batch) error + PutBatch(Batch) error Seek(k []byte, f func(k, v []byte)) } diff --git a/pkg/core/transaction/transaction.go b/pkg/core/transaction/transaction.go index aa4c18ca7..5f3fe2145 100644 --- a/pkg/core/transaction/transaction.go +++ b/pkg/core/transaction/transaction.go @@ -88,6 +88,8 @@ func (t *Transaction) DecodeBinary(r io.Reader) error { for i := 0; i < int(lenAttrs); i++ { t.Attributes[i] = &Attribute{} if err := t.Attributes[i].DecodeBinary(r); err != nil { + // @TODO: remove this when TX attribute decode bug is solved. + log.Warnf("failed to decode TX %s", t.hash) return err } } diff --git a/pkg/core/util.go b/pkg/core/util.go index 66753c40c..bdba1a5de 100644 --- a/pkg/core/util.go +++ b/pkg/core/util.go @@ -3,7 +3,6 @@ package core import ( "bytes" "encoding/binary" - "sort" "time" "github.com/CityOfZion/neo-go/config" @@ -228,38 +227,3 @@ func storeAsTransaction(batch storage.Batch, tx *transaction.Transaction, index return nil } - -// readStoredHeaderHashes returns a sorted list of header hashes -// retrieved from the given Store. -func readStoredHeaderHashes(store storage.Store) ([]util.Uint256, error) { - hashMap := make(map[uint32][]util.Uint256) - store.Seek(storage.IXHeaderHashList.Bytes(), func(k, v []byte) { - storedCount := binary.LittleEndian.Uint32(k[1:]) - hashes, err := util.Read2000Uint256Hashes(v) - if err != nil { - panic(err) - } - hashMap[storedCount] = hashes - }) - - var ( - i = 0 - sortedKeys = make([]int, len(hashMap)) - ) - - for k, _ := range hashMap { - sortedKeys[i] = int(k) - i++ - } - sort.Ints(sortedKeys) - - hashes := []util.Uint256{} - for _, key := range sortedKeys { - values := hashMap[uint32(key)] - for _, hash := range values { - hashes = append(hashes, hash) - } - } - - return hashes, nil -} diff --git a/pkg/network/message.go b/pkg/network/message.go index fc1ed497e..6c5b7902a 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -124,8 +124,8 @@ func (m *Message) CommandType() CommandType { } } -// decode a Message from the given reader. -func (m *Message) decode(r io.Reader) error { +// Decode a Message from the given reader. +func (m *Message) Decode(r io.Reader) error { if err := binary.Read(r, binary.LittleEndian, &m.Magic); err != nil { return err } @@ -205,8 +205,8 @@ func (m *Message) decodePayload(r io.Reader) error { return nil } -// encode a Message to any given io.Writer. -func (m *Message) encode(w io.Writer) error { +// Encode a Message to any given io.Writer. +func (m *Message) Encode(w io.Writer) error { if err := binary.Write(w, binary.LittleEndian, m.Magic); err != nil { return err } diff --git a/pkg/network/server.go b/pkg/network/server.go index 47ce3cf86..c92f50f84 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -109,10 +109,13 @@ func (s *Server) Shutdown() { close(s.quit) } +// UnconnectedPeers returns a list of peers that are in the discovery peer list +// but are not connected to the server. func (s *Server) UnconnectedPeers() []string { return s.discovery.UnconnectedPeers() } +// BadPeers returns a list of peers the are flagged as "bad" peers. func (s *Server) BadPeers() []string { return s.discovery.BadPeers() } @@ -340,8 +343,8 @@ func (s *Server) processProto(proto protoTuple) error { getHeaders := msg.Payload.(*payload.GetBlocks) s.handleGetHeadersCmd(peer, getHeaders) case CMDVerack: - // Make sure this peer has sended his version before we start the - // protocol. + // Make sure this peer has send his version before we start the + // protocol with that peer. if peer.Version() == nil { return errInvalidHandshake } diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 7b10657fb..de80778dc 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -38,7 +38,7 @@ func NewTCPPeer(conn net.Conn, proto chan protoTuple) *TCPPeer { // Send implements the Peer interface. This will encode the message // to the underlying connection. func (p *TCPPeer) Send(msg *Message) { - if err := msg.encode(p.conn); err != nil { + if err := msg.Encode(p.conn); err != nil { select { case p.disc <- err: case <-p.closed: @@ -71,7 +71,7 @@ func (p *TCPPeer) readLoop(proto chan protoTuple, readErr chan error) { return default: msg := &Message{} - if err := msg.decode(p.conn); err != nil { + if err := msg.Decode(p.conn); err != nil { readErr <- err return }