Merge pull request #529 from nspcc-dev/peer-communication-fixes

A set of fixes to make neo-go privnet more usable.
This commit is contained in:
Roman Khimov 2019-11-29 16:29:28 +03:00 committed by GitHub
commit 65332f5e7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 127 additions and 68 deletions

Binary file not shown.

View file

@ -24,9 +24,9 @@ services:
neo_go_network: neo_go_network:
ipv4_address: 172.200.0.1 ipv4_address: 172.200.0.1
ports: ports:
- 20331:20331 - 20333:20333
- 20341:20341 - 30333:30333
- 20351:20351 - 20001:20001
node_two: node_two:
container_name: neo_go_node_two container_name: neo_go_node_two
image: env_neo_go_image image: env_neo_go_image
@ -38,9 +38,9 @@ services:
neo_go_network: neo_go_network:
ipv4_address: 172.200.0.2 ipv4_address: 172.200.0.2
ports: ports:
- 20332:20332 - 20334:20334
- 20342:20342 - 30334:30334
- 20352:20352 - 20002:20002
node_three: node_three:
container_name: neo_go_node_three container_name: neo_go_node_three
image: env_neo_go_image image: env_neo_go_image
@ -52,9 +52,9 @@ services:
neo_go_network: neo_go_network:
ipv4_address: 172.200.0.3 ipv4_address: 172.200.0.3
ports: ports:
- 20333:20333 - 20335:20335
- 20343:20343 - 30335:30335
- 20353:20353 - 20003:20003
node_four: node_four:
container_name: neo_go_node_four container_name: neo_go_node_four
image: env_neo_go_image image: env_neo_go_image
@ -66,6 +66,6 @@ services:
neo_go_network: neo_go_network:
ipv4_address: 172.200.0.4 ipv4_address: 172.200.0.4
ports: ports:
- 20334:20334 - 20336:20336
- 20344:20344 - 30336:30336
- 20354:20354 - 20004:20004

6
.docker/privnet-entrypoint.sh Executable file
View file

@ -0,0 +1,6 @@
#!/bin/sh
if test -f /6000-privnet-blocks.acc.gz; then
gunzip /6000-privnet-blocks.acc.gz
/usr/bin/neo-go db restore -i /6000-privnet-blocks.acc
fi
/usr/bin/neo-go "$@"

View file

@ -22,7 +22,7 @@ RUN set -x \
&& go build -v -mod=vendor -ldflags "${LDFLAGS}" -o /go/bin/neo-go ./cli && go build -v -mod=vendor -ldflags "${LDFLAGS}" -o /go/bin/neo-go ./cli
# Executable image # Executable image
FROM scratch FROM alpine
ARG VERSION ARG VERSION
LABEL version=$VERSION LABEL version=$VERSION
@ -30,9 +30,11 @@ LABEL version=$VERSION
WORKDIR / WORKDIR /
COPY --from=builder /neo-go/config /config COPY --from=builder /neo-go/config /config
COPY --from=builder /neo-go/.docker/6000-privnet-blocks.acc.gz /
COPY --from=builder /neo-go/.docker/privnet-entrypoint.sh /usr/bin/privnet-entrypoint.sh
COPY --from=builder /go/bin/neo-go /usr/bin/neo-go COPY --from=builder /go/bin/neo-go /usr/bin/neo-go
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
ENTRYPOINT ["/usr/bin/neo-go"] ENTRYPOINT ["/usr/bin/privnet-entrypoint.sh"]
CMD ["node", "--config-path", "/config", "--testnet"] CMD ["node", "--config-path", "/config", "--privnet"]

View file

@ -108,10 +108,13 @@ env_up:
@docker-compose -f $(DC_FILE) up -d node_one node_two node_three node_four @docker-compose -f $(DC_FILE) up -d node_one node_two node_three node_four
env_down: env_down:
@echo "=> Stop and cleanup environment" @echo "=> Stop environment"
@docker-compose -f $(DC_FILE) down @docker-compose -f $(DC_FILE) down
env_restart: env_restart:
@echo "=> Stop and cleanup environment" @echo "=> Stop and start environment"
@docker-compose -f $(DC_FILE) restart @docker-compose -f $(DC_FILE) restart
env_clean: env_down
@echo "=> Cleanup environment"
@docker volume rm docker_volume_chain

View file

@ -9,10 +9,10 @@ ProtocolConfiguration:
- 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699 - 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699
- 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62 - 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62
SeedList: SeedList:
- 172.200.0.1:20331 - 172.200.0.1:20333
- 172.200.0.2:20332 - 172.200.0.2:20334
- 172.200.0.3:20333 - 172.200.0.3:20335
- 172.200.0.4:20334 - 172.200.0.4:20336
SystemFee: SystemFee:
EnrollmentTransaction: 1000 EnrollmentTransaction: 1000
IssueTransaction: 500 IssueTransaction: 500
@ -37,7 +37,7 @@ ApplicationConfiguration:
# FilePath: "./chains/privnet.bolt" # FilePath: "./chains/privnet.bolt"
# Uncomment in order to set up custom address for node. # Uncomment in order to set up custom address for node.
# Address: 127.0.0.1 # Address: 127.0.0.1
NodePort: 20334 NodePort: 20336
Relay: true Relay: true
DialTimeout: 3 DialTimeout: 3
ProtoTickInterval: 2 ProtoTickInterval: 2
@ -47,10 +47,10 @@ ApplicationConfiguration:
RPC: RPC:
Enabled: true Enabled: true
EnableCORSWorkaround: false EnableCORSWorkaround: false
Port: 20344 Port: 30336
Monitoring: Monitoring:
Enabled: true Enabled: true
Port: 20354 Port: 20004
UnlockWallet: UnlockWallet:
Path: "6PYRXVwHSqFSukL3CuXxdQ75VmsKpjeLgQLEjt83FrtHf1gCVphHzdD4nc" Path: "6PYRXVwHSqFSukL3CuXxdQ75VmsKpjeLgQLEjt83FrtHf1gCVphHzdD4nc"
Password: "four" Password: "four"

View file

@ -9,10 +9,10 @@ ProtocolConfiguration:
- 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699 - 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699
- 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62 - 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62
SeedList: SeedList:
- 172.200.0.1:20331 - 172.200.0.1:20333
- 172.200.0.2:20332 - 172.200.0.2:20334
- 172.200.0.3:20333 - 172.200.0.3:20335
- 172.200.0.4:20334 - 172.200.0.4:20336
SystemFee: SystemFee:
EnrollmentTransaction: 1000 EnrollmentTransaction: 1000
IssueTransaction: 500 IssueTransaction: 500
@ -37,7 +37,7 @@ ApplicationConfiguration:
# FilePath: "./chains/privnet.bolt" # FilePath: "./chains/privnet.bolt"
# Uncomment in order to set up custom address for node. # Uncomment in order to set up custom address for node.
# Address: 127.0.0.1 # Address: 127.0.0.1
NodePort: 20331 NodePort: 20333
Relay: true Relay: true
DialTimeout: 3 DialTimeout: 3
ProtoTickInterval: 2 ProtoTickInterval: 2
@ -47,10 +47,10 @@ ApplicationConfiguration:
RPC: RPC:
Enabled: true Enabled: true
EnableCORSWorkaround: false EnableCORSWorkaround: false
Port: 20341 Port: 30333
Monitoring: Monitoring:
Enabled: true Enabled: true
Port: 20351 Port: 20001
UnlockWallet: UnlockWallet:
Path: "6PYLmjBYJ4wQTCEfqvnznGJwZeW9pfUcV5m5oreHxqryUgqKpTRAFt9L8Y" Path: "6PYLmjBYJ4wQTCEfqvnznGJwZeW9pfUcV5m5oreHxqryUgqKpTRAFt9L8Y"
Password: "one" Password: "one"

View file

@ -9,10 +9,10 @@ ProtocolConfiguration:
- 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699 - 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699
- 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62 - 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62
SeedList: SeedList:
- 172.200.0.1:20331 - 172.200.0.1:20333
- 172.200.0.2:20332 - 172.200.0.2:20334
- 172.200.0.3:20333 - 172.200.0.3:20335
- 172.200.0.4:20334 - 172.200.0.4:20336
SystemFee: SystemFee:
EnrollmentTransaction: 1000 EnrollmentTransaction: 1000
IssueTransaction: 500 IssueTransaction: 500
@ -37,7 +37,7 @@ ApplicationConfiguration:
# FilePath: "./chains/privnet.bolt" # FilePath: "./chains/privnet.bolt"
# Uncomment in order to set up custom address for node. # Uncomment in order to set up custom address for node.
# Address: 127.0.0.1 # Address: 127.0.0.1
NodePort: 20333 NodePort: 20335
Relay: true Relay: true
DialTimeout: 3 DialTimeout: 3
ProtoTickInterval: 2 ProtoTickInterval: 2
@ -47,10 +47,10 @@ ApplicationConfiguration:
RPC: RPC:
Enabled: true Enabled: true
EnableCORSWorkaround: false EnableCORSWorkaround: false
Port: 20343 Port: 30335
Monitoring: Monitoring:
Enabled: true Enabled: true
Port: 20353 Port: 20003
UnlockWallet: UnlockWallet:
Path: "6PYX86vYiHfUbpD95hfN1xgnvcSxy5skxfWYKu3ztjecxk6ikYs2kcWbeh" Path: "6PYX86vYiHfUbpD95hfN1xgnvcSxy5skxfWYKu3ztjecxk6ikYs2kcWbeh"
Password: "three" Password: "three"

View file

@ -9,10 +9,10 @@ ProtocolConfiguration:
- 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699 - 03d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699
- 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62 - 02a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd62
SeedList: SeedList:
- 172.200.0.1:20331 - 172.200.0.1:20333
- 172.200.0.2:20332 - 172.200.0.2:20334
- 172.200.0.3:20333 - 172.200.0.3:20335
- 172.200.0.4:20334 - 172.200.0.4:20336
SystemFee: SystemFee:
EnrollmentTransaction: 1000 EnrollmentTransaction: 1000
IssueTransaction: 500 IssueTransaction: 500
@ -37,7 +37,7 @@ ApplicationConfiguration:
# FilePath: "./chains/privnet.bolt" # FilePath: "./chains/privnet.bolt"
# Uncomment in order to set up custom address for node. # Uncomment in order to set up custom address for node.
# Address: 127.0.0.1 # Address: 127.0.0.1
NodePort: 20332 NodePort: 20334
Relay: true Relay: true
DialTimeout: 3 DialTimeout: 3
ProtoTickInterval: 2 ProtoTickInterval: 2
@ -47,10 +47,10 @@ ApplicationConfiguration:
RPC: RPC:
Enabled: true Enabled: true
EnableCORSWorkaround: false EnableCORSWorkaround: false
Port: 20342 Port: 30334
Monitoring: Monitoring:
Enabled: true Enabled: true
Port: 20352 Port: 20002
UnlockWallet: UnlockWallet:
Path: "6PYXHjPaNvW8YknSXaKsTWjf9FRxo1s4naV2jdmSQEgzaqKGX368rndN3L" Path: "6PYXHjPaNvW8YknSXaKsTWjf9FRxo1s4naV2jdmSQEgzaqKGX368rndN3L"
Password: "two" Password: "two"

View file

@ -62,6 +62,9 @@ type Config struct {
// Broadcast is a callback which is called to notify server // Broadcast is a callback which is called to notify server
// about new consensus payload to sent. // about new consensus payload to sent.
Broadcast func(p *Payload) Broadcast func(p *Payload)
// RelayBlock is a callback that is called to notify server
// about the new block that needs to be broadcasted.
RelayBlock func(b *core.Block)
// Chain is a core.Blockchainer instance. // Chain is a core.Blockchainer instance.
Chain core.Blockchainer Chain core.Blockchainer
// RequestTx is a callback to which will be called // RequestTx is a callback to which will be called
@ -252,6 +255,8 @@ func (s *service) processBlock(b block.Block) {
if err := s.Chain.AddBlock(bb); err != nil { if err := s.Chain.AddBlock(bb); err != nil {
s.log.Warnf("error on add block: %v", err) s.log.Warnf("error on add block: %v", err)
} else {
s.Config.RelayBlock(bb)
} }
} }

View file

@ -150,7 +150,7 @@ func (bc *Blockchain) init() error {
// There is a high chance that the Node is stopped before the next // There is a high chance that the Node is stopped before the next
// batch of 2000 headers was stored. Via the currentHeaders stored we can sync // batch of 2000 headers was stored. Via the currentHeaders stored we can sync
// that with stored blocks. // that with stored blocks.
if currHeaderHeight > bc.storedHeaderCount { if currHeaderHeight >= bc.storedHeaderCount {
hash := currHeaderHash hash := currHeaderHash
var targetHash util.Uint256 var targetHash util.Uint256
if bc.headerList.Len() > 0 { if bc.headerList.Len() > 0 {

View file

@ -13,7 +13,7 @@ type Headers struct {
// Users can at most request 2k header. // Users can at most request 2k header.
const ( const (
maxHeadersAllowed = 2000 MaxHeadersAllowed = 2000
) )
// DecodeBinary implements Serializable interface. // DecodeBinary implements Serializable interface.
@ -21,9 +21,9 @@ func (p *Headers) DecodeBinary(br *io.BinReader) {
lenHeaders := br.ReadVarUint() lenHeaders := br.ReadVarUint()
// C# node does it silently // C# node does it silently
if lenHeaders > maxHeadersAllowed { if lenHeaders > MaxHeadersAllowed {
log.Warnf("received %d headers, capping to %d", lenHeaders, maxHeadersAllowed) log.Warnf("received %d headers, capping to %d", lenHeaders, MaxHeadersAllowed)
lenHeaders = maxHeadersAllowed lenHeaders = MaxHeadersAllowed
} }
p.Hdrs = make([]*core.Header, lenHeaders) p.Hdrs = make([]*core.Header, lenHeaders)

View file

@ -37,6 +37,7 @@ var (
errMaxPeers = errors.New("max peers reached") errMaxPeers = errors.New("max peers reached")
errServerShutdown = errors.New("server shutdown") errServerShutdown = errors.New("server shutdown")
errInvalidInvType = errors.New("invalid inventory type") errInvalidInvType = errors.New("invalid inventory type")
errInvalidHashStart = errors.New("invalid requested HashStart")
) )
type ( type (
@ -94,10 +95,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server {
} }
srv, err := consensus.NewService(consensus.Config{ srv, err := consensus.NewService(consensus.Config{
Broadcast: s.handleNewPayload, Broadcast: s.handleNewPayload,
Chain: chain, RelayBlock: s.relayBlock,
RequestTx: s.requestTx, Chain: chain,
Wallet: config.Wallet, RequestTx: s.requestTx,
Wallet: config.Wallet,
}) })
if err != nil { if err != nil {
return nil return nil
@ -200,12 +202,6 @@ func (s *Server) run() {
} }
return return
case p := <-s.register: case p := <-s.register:
// When a new peer is connected we send out our version immediately.
if err := s.sendVersion(p); err != nil {
log.WithFields(log.Fields{
"addr": p.RemoteAddr(),
}).Error(err)
}
s.lock.Lock() s.lock.Lock()
s.peers[p] = true s.peers[p] = true
s.lock.Unlock() s.lock.Unlock()
@ -289,6 +285,8 @@ func (s *Server) PeerCount() int {
// startProtocol starts a long running background loop that interacts // startProtocol starts a long running background loop that interacts
// every ProtoTickInterval with the peer. // every ProtoTickInterval with the peer.
func (s *Server) startProtocol(p Peer) { func (s *Server) startProtocol(p Peer) {
var err error
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"addr": p.RemoteAddr(), "addr": p.RemoteAddr(),
"userAgent": string(p.Version().UserAgent), "userAgent": string(p.Version().UserAgent),
@ -297,10 +295,12 @@ func (s *Server) startProtocol(p Peer) {
}).Info("started protocol") }).Info("started protocol")
s.discovery.RegisterGoodAddr(p.PeerAddr().String()) s.discovery.RegisterGoodAddr(p.PeerAddr().String())
err := s.requestHeaders(p) if s.chain.HeaderHeight() < p.Version().StartHeight {
if err != nil { err = s.requestHeaders(p)
p.Disconnect(err) if err != nil {
return p.Disconnect(err)
return
}
} }
timer := time.NewTimer(s.ProtoTickInterval) timer := time.NewTimer(s.ProtoTickInterval)
@ -427,6 +427,35 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
return nil return nil
} }
// handleGetHeadersCmd processes the getheaders request.
func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
if len(gh.HashStart) < 1 {
return errInvalidHashStart
}
startHash := gh.HashStart[0]
start, err := s.chain.GetHeader(startHash)
if err != nil {
return err
}
resp := payload.Headers{}
resp.Hdrs = make([]*core.Header, 0, payload.MaxHeadersAllowed)
for i := start.Index + 1; i < start.Index+1+payload.MaxHeadersAllowed; i++ {
hash := s.chain.GetHeaderHash(int(i))
if hash.Equals(util.Uint256{}) || hash.Equals(gh.HashStop) {
break
}
header, err := s.chain.GetHeader(hash)
if err != nil {
break
}
resp.Hdrs = append(resp.Hdrs, header)
}
if len(resp.Hdrs) == 0 {
return nil
}
return p.WriteMsg(NewMessage(s.Net, CMDHeaders, &resp))
}
// handleConsensusCmd processes received consensus payload. // handleConsensusCmd processes received consensus payload.
// It never returns an error. // It never returns an error.
func (s *Server) handleConsensusCmd(cp *consensus.Payload) error { func (s *Server) handleConsensusCmd(cp *consensus.Payload) error {
@ -438,6 +467,9 @@ func (s *Server) handleConsensusCmd(cp *consensus.Payload) error {
// It never returns an error. // It never returns an error.
func (s *Server) handleTxCmd(tx *transaction.Transaction) error { func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.consensus.OnTransaction(tx) s.consensus.OnTransaction(tx)
// It's OK for it to fail for various reasons like tx already existing
// in the pool.
_ = s.RelayTxn(tx)
return nil return nil
} }
@ -520,6 +552,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDGetData: case CMDGetData:
inv := msg.Payload.(*payload.Inventory) inv := msg.Payload.(*payload.Inventory)
return s.handleGetDataCmd(peer, inv) return s.handleGetDataCmd(peer, inv)
case CMDGetHeaders:
gh := msg.Payload.(*payload.GetBlocks)
return s.handleGetHeadersCmd(peer, gh)
case CMDHeaders: case CMDHeaders:
headers := msg.Payload.(*payload.Headers) headers := msg.Payload.(*payload.Headers)
go s.handleHeadersCmd(peer, headers) go s.handleHeadersCmd(peer, headers)
@ -580,6 +615,11 @@ func (s *Server) relayInventory(t payload.InventoryType, hashes ...util.Uint256)
} }
} }
// relayBlock tells all the other connected nodes about the given block.
func (s *Server) relayBlock(b *core.Block) {
s.relayInventory(payload.BlockType, b.Hash())
}
// RelayTxn a new transaction to the local node and the connected peers. // RelayTxn a new transaction to the local node and the connected peers.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159 // Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159
func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
@ -599,10 +639,7 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
return RelayOutOfMemory return RelayOutOfMemory
} }
for p := range s.Peers() { s.relayInventory(payload.TXType, t.Hash())
payload := payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})
s.RelayDirectly(p, payload)
}
return RelaySucceed return RelaySucceed
} }

View file

@ -78,6 +78,12 @@ func (t *TCPTransport) handleConn(conn net.Conn) {
t.server.register <- p t.server.register <- p
// When a new peer is connected we send out our version immediately.
if err := t.server.sendVersion(p); err != nil {
log.WithFields(log.Fields{
"addr": p.RemoteAddr(),
}).Error(err)
}
r := io.NewBinReaderFromIO(p.conn) r := io.NewBinReaderFromIO(p.conn)
for { for {
msg := &Message{} msg := &Message{}