// This impl uses channels to simulate the queue handler with the actor model.
// A suitable number k ,should be set for channel size, because if #numOfMsg > k,
// we lose determinism. k chosen should be large enough that when filled, it shall indicate that
// the peer has stopped responding, since we do not have a pingMSG, we will need another way to shut down
// peers

package peer

import (
	"errors"
	"fmt"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"github.com/CityOfZion/neo-go/pkg/wire/command"

	"github.com/CityOfZion/neo-go/pkg/peer/stall"
	"github.com/CityOfZion/neo-go/pkg/wire"
	"github.com/CityOfZion/neo-go/pkg/wire/payload"
	"github.com/CityOfZion/neo-go/pkg/wire/protocol"
	"github.com/CityOfZion/neo-go/pkg/wire/util"
)

const (
	maxOutboundConnections = 100
	protocolVer            = protocol.DefaultVersion
	handshakeTimeout       = 30 * time.Second
	idleTimeout            = 5 * time.Minute // If no message received after idleTimeout, then peer disconnects

	// nodes will have `responseTime` seconds to reply with a response
	responseTime = 120 * time.Second

	// the stall detector will check every `tickerInterval` to see if messages
	// are overdue. Should be less than `responseTime`
	tickerInterval = 30 * time.Second

	// The input buffer size is the amount of mesages that
	// can be buffered into the channel to receive at once before
	// blocking, and before determinism is broken
	inputBufferSize = 100

	// The output buffer size is the amount of messages that
	// can be buffered into the channel to send at once before
	// blocking, and before determinism is broken.
	outputBufferSize = 100

	// pingInterval = 20 * time.Second //Not implemented in neo clients
)

var (
	errHandShakeTimeout = errors.New("Handshake timed out, peers have " + string(handshakeTimeout) + " Seconds to Complete the handshake")
)

// Peer represents a peer on the neo network
type Peer struct {
	config LocalConfig
	conn   net.Conn

	startHeight uint32

	// atomic vals
	disconnected int32

	//unchangeable state: concurrent safe
	addr      string
	protoVer  protocol.Version
	port      uint16
	inbound   bool
	userAgent string
	services  protocol.ServiceFlag
	createdAt time.Time
	relay     bool

	statemutex     sync.Mutex
	verackReceived bool
	versionKnown   bool

	*stall.Detector

	inch   chan func() // will handle all incoming connections from peer
	outch  chan func() // will handle all outcoming connections from peer
	quitch chan struct{}
}

// NewPeer returns a new NEO peer
func NewPeer(con net.Conn, inbound bool, cfg LocalConfig) *Peer {
	return &Peer{
		inch:        make(chan func(), inputBufferSize),
		outch:       make(chan func(), outputBufferSize),
		quitch:      make(chan struct{}, 1),
		inbound:     inbound,
		config:      cfg,
		conn:        con,
		createdAt:   time.Now(),
		startHeight: 0,
		addr:        con.RemoteAddr().String(),
		Detector:    stall.NewDetector(responseTime, tickerInterval),
	}
}

// Write to a peer
func (p *Peer) Write(msg wire.Messager) error {
	return wire.WriteMessage(p.conn, p.config.Net, msg)
}

// Read to a peer
func (p *Peer) Read() (wire.Messager, error) {
	return wire.ReadMessage(p.conn, p.config.Net)
}

// Disconnect disconnects a peer and closes the connection
func (p *Peer) Disconnect() {

	// return if already disconnected
	if atomic.LoadInt32(&p.disconnected) != 0 {
		return
	}

	atomic.AddInt32(&p.disconnected, 1)

	p.Detector.Quit()
	close(p.quitch)
	p.conn.Close()

	fmt.Println("Disconnected Peer with address", p.RemoteAddr().String())
}

// Port returns the peers port
func (p *Peer) Port() uint16 {
	return p.port
}

// CreatedAt returns the time at which the connection was made
func (p *Peer) CreatedAt() time.Time {
	return p.createdAt
}

// Height returns the latest recorded height of this peer
func (p *Peer) Height() uint32 {
	return p.startHeight
}

// CanRelay returns true, if the peer can relay information
func (p *Peer) CanRelay() bool {
	return p.relay
}

// LocalAddr returns this node's local address
func (p *Peer) LocalAddr() net.Addr {
	return p.conn.LocalAddr()
}

// RemoteAddr returns the remote address of the connected peer
func (p *Peer) RemoteAddr() net.Addr {
	return p.conn.RemoteAddr()
}

// Services returns the services offered by the peer
func (p *Peer) Services() protocol.ServiceFlag {
	return p.config.Services
}

//Inbound returns true whether this peer is an inbound peer
func (p *Peer) Inbound() bool {
	return p.inbound
}

// IsVerackReceived returns true, if this node has
// received a verack from this peer
func (p *Peer) IsVerackReceived() bool {
	return p.verackReceived
}

//NotifyDisconnect returns once the peer has disconnected
// Blocking
func (p *Peer) NotifyDisconnect() {
	<-p.quitch
	fmt.Println("Peer has just disconnected")
}

//End of Exposed API functions//

// PingLoop not impl. in neo yet, adding it now
// will cause this client to disconnect from all other implementations
func (p *Peer) PingLoop() { /*not implemented in other neo clients*/ }

// Run is used to start communicating with the peer
// completes the handshake and starts observing
// for messages coming in
func (p *Peer) Run() error {

	err := p.Handshake()
	if err != nil {
		return err
	}
	go p.StartProtocol()
	go p.ReadLoop()
	go p.WriteLoop()

	//go p.PingLoop() // since it is not implemented. It will disconnect all other impls.
	return nil
}

// StartProtocol run as a go-routine, will act as our queue for messages
// should be ran after handshake
func (p *Peer) StartProtocol() {
loop:
	for atomic.LoadInt32(&p.disconnected) == 0 {
		select {
		case f := <-p.inch:
			f()
		case <-p.quitch:
			break loop
		case <-p.Detector.Quitch:
			fmt.Println("Peer stalled, disconnecting")
			break loop
		}
	}
	p.Disconnect()
}

// ReadLoop Will block on the read until a message is read
// Should only be called after handshake is complete
// on a seperate go-routine.
func (p *Peer) ReadLoop() {

	idleTimer := time.AfterFunc(idleTimeout, func() {
		fmt.Println("Timing out peer")
		p.Disconnect()
	})

loop:
	for atomic.LoadInt32(&p.disconnected) == 0 {

		idleTimer.Reset(idleTimeout) // reset timer on each loop

		readmsg, err := p.Read()

		// Message read; stop Timer
		idleTimer.Stop()

		if err != nil {
			fmt.Println("Err on read", err) // This will also happen if Peer is disconnected
			break loop
		}

		// Remove message as pending from the stall detector
		p.Detector.RemoveMessage(readmsg.Command())

		switch msg := readmsg.(type) {

		case *payload.VersionMessage:
			fmt.Println("Already received a Version, disconnecting. " + p.RemoteAddr().String())
			break loop // We have already done the handshake, break loop and disconnect
		case *payload.VerackMessage:
			if p.verackReceived {
				fmt.Println("Already received a Verack, disconnecting. " + p.RemoteAddr().String())
				break loop
			}
			p.statemutex.Lock() // This should not happen, however if it does, then we should set it.
			p.verackReceived = true
			p.statemutex.Unlock()
		case *payload.AddrMessage:
			p.OnAddr(msg)
		case *payload.GetAddrMessage:
			p.OnGetAddr(msg)
		case *payload.GetBlocksMessage:
			p.OnGetBlocks(msg)
		case *payload.BlockMessage:
			p.OnBlocks(msg)
		case *payload.HeadersMessage:
			p.OnHeaders(msg)
		case *payload.GetHeadersMessage:
			p.OnGetHeaders(msg)
		case *payload.InvMessage:
			p.OnInv(msg)
		case *payload.GetDataMessage:
			p.OnGetData(msg)
		case *payload.TXMessage:
			p.OnTX(msg)
		default:
			fmt.Println("Cannot recognise message", msg.Command()) //Do not disconnect peer, just Log Message
		}
	}

	idleTimer.Stop()
	p.Disconnect()
}

// WriteLoop will Queue all messages to be written to the peer.
func (p *Peer) WriteLoop() {
	for atomic.LoadInt32(&p.disconnected) == 0 {
		select {
		case f := <-p.outch:
			f()
		case <-p.Detector.Quitch: // if the detector quits, disconnect peer
			p.Disconnect()
		}
	}
}

// Outgoing Requests

// RequestHeaders will write a getheaders to this peer
func (p *Peer) RequestHeaders(hash util.Uint256) error {
	c := make(chan error, 0)
	p.outch <- func() {
		getHeaders, err := payload.NewGetHeadersMessage([]util.Uint256{hash}, util.Uint256{})
		err = p.Write(getHeaders)
		if err != nil {
			p.Detector.AddMessage(command.GetHeaders)
		}
		c <- err
	}
	return <-c
}

// RequestBlocks will ask this peer for a set of blocks
func (p *Peer) RequestBlocks(hashes []util.Uint256) error {
	c := make(chan error, 0)

	p.outch <- func() {
		getdata, err := payload.NewGetDataMessage(payload.InvTypeBlock)
		err = getdata.AddHashes(hashes)
		if err != nil {
			c <- err
			return
		}

		err = p.Write(getdata)
		if err != nil {
			p.Detector.AddMessage(command.GetData)
		}

		c <- err
	}
	return <-c
}