Update apache/thrift to 0.11.0 and remove pinning (#1317)

The `apache/thrift` recently released a new version of `0.11.0`
several days ago. This release is compatible with other packages
and as such, there is no need to pinning the `apache/thrift`
to `master` anymore in Gopkg.toml.

This fix removes the pinning of `apache/thrift` in Gopkg.toml,
and updates all dependencies of coredns.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This commit is contained in:
Yong Tang 2017-12-18 11:50:56 -06:00 committed by GitHub
parent ba4e77672c
commit 4dd40a292c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6992 changed files with 30842 additions and 1995023 deletions

View file

@ -32,11 +32,14 @@ import (
"sync"
"time"
"io/ioutil"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/keepalive"
@ -96,6 +99,11 @@ type Server struct {
cv *sync.Cond
m map[string]*service // service name -> service info
events trace.EventLog
quit chan struct{}
done chan struct{}
quitOnce sync.Once
doneOnce sync.Once
}
type options struct {
@ -118,11 +126,13 @@ type options struct {
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
}
var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
@ -181,14 +191,24 @@ func CustomCodec(codec Codec) ServerOption {
}
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By
// default, server messages will be sent using the same compressor with which
// request messages were sent.
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
o.cp = cp
}
}
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
// messages. It has higher priority than decompressors registered via
// encoding.RegisterCompressor.
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
o.dc = dc
@ -291,6 +311,18 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
}
}
// ConnectionTimeout returns a ServerOption that sets the timeout for
// connection establishment (up to and including HTTP/2 handshaking) for all
// new connections. If this is not set, the default is 120 seconds. A zero or
// negative value will result in an immediate timeout.
//
// This API is EXPERIMENTAL.
func ConnectionTimeout(d time.Duration) ServerOption {
return func(o *options) {
o.connectionTimeout = d
}
}
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@ -307,6 +339,8 @@ func NewServer(opt ...ServerOption) *Server {
opts: opts,
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
quit: make(chan struct{}),
done: make(chan struct{}),
}
s.cv = sync.NewCond(&s.mu)
s.ctx, s.cancel = context.WithCancel(context.Background())
@ -418,11 +452,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
return ret
}
var (
// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
ErrServerStopped = errors.New("grpc: the server has been stopped")
)
// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if s.opts.creds == nil {
@ -436,7 +468,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve always returns non-nil error.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
@ -487,6 +519,14 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
// If Stop or GracefulStop is called, block until they are done and return nil
select {
case <-s.quit:
<-s.done
return nil
default:
}
return err
}
tempDelay = 0
@ -499,16 +539,18 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn is run in its own goroutine and handles a just-accepted
// connection that has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
// If serverHandShake returns ErrConnDispatched, keep rawConn open.
// If serverHandshake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}
@ -521,18 +563,21 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.mu.Unlock()
if s.opts.useHandlerImpl {
rawConn.SetDeadline(time.Time{})
s.serveUsingHandler(conn)
} else {
s.serveHTTP2Transport(conn, authInfo)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
s.serveStreams(st)
}
}
// serveHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go) and
// serves streams on it.
// This is run in its own goroutine (it does network I/O in
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
@ -552,13 +597,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
s.mu.Unlock()
c.Close()
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return
return nil
}
if !s.addConn(st) {
st.Close()
return
return nil
}
s.serveStreams(st)
return st
}
func (s *Server) serveStreams(st transport.ServerTransport) {
@ -686,18 +731,14 @@ func (s *Server) removeConn(c io.Closer) {
}
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
var (
cbuf *bytes.Buffer
outPayload *stats.OutPayload
)
if cp != nil {
cbuf = new(bytes.Buffer)
}
if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
hdr, data, err := encode(s.opts.codec, msg, cp, outPayload, comp)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
@ -741,10 +782,43 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}()
}
if s.opts.cp != nil {
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
stream.SetSendCompress(s.opts.cp.Type())
// comp and cp are used for compression. decomp and dc are used for
// decompression. If comp and decomp are both set, they are the same;
// however they are kept separate to ensure that at most one of the
// compressor/decompressor variable pairs are set for use later.
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
decomp = encoding.GetCompressor(rc)
if decomp == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
t.WriteStatus(stream, st)
return st.Err()
}
}
// If cp is set, use it. Otherwise, attempt to compress the response using
// the incoming message compression method.
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
stream.SetSendCompress(cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
stream.SetSendCompress(rc)
}
}
p := &parser{r: stream}
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
if err == io.EOF {
@ -773,19 +847,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return err
}
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
}
if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
if e := t.WriteStatus(stream, st); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
return st.Err()
}
var inPayload *stats.InPayload
if sh != nil {
@ -799,9 +865,17 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if pf == compressionMade {
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
if err != nil {
return Errorf(codes.Internal, err.Error())
if dc != nil {
req, err = dc.Do(bytes.NewReader(req))
if err != nil {
return Errorf(codes.Internal, err.Error())
}
} else {
tmp, _ := decomp.Decompress(bytes.NewReader(req))
req, err = ioutil.ReadAll(tmp)
if err != nil {
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
}
if len(req) > s.opts.maxReceiveMessageSize {
@ -847,7 +921,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Last: true,
Delay: false,
}
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
@ -896,21 +971,45 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
sh.HandleRPC(stream.Context(), end)
}()
}
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
cp: s.opts.cp,
dc: s.opts.dc,
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: sh,
}
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
ss.dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
ss.decomp = encoding.GetCompressor(rc)
if ss.decomp == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
t.WriteStatus(ss.s, st)
return st.Err()
}
}
// If cp is set, use it. Otherwise, attempt to compress the response using
// the incoming message compression method.
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
ss.cp = s.opts.cp
stream.SetSendCompress(s.opts.cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil {
stream.SetSendCompress(rc)
}
}
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
@ -1054,6 +1153,16 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
s.quitOnce.Do(func() {
close(s.quit)
})
defer func() {
s.doneOnce.Do(func() {
close(s.done)
})
}()
s.mu.Lock()
listeners := s.lis
s.lis = nil
@ -1083,6 +1192,16 @@ func (s *Server) Stop() {
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
s.quitOnce.Do(func() {
close(s.quit)
})
defer func() {
s.doneOnce.Do(func() {
close(s.done)
})
}()
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@ -1110,25 +1229,11 @@ func (s *Server) GracefulStop() {
}
func init() {
internal.TestingCloseConns = func(arg interface{}) {
arg.(*Server).testingCloseConns()
}
internal.TestingUseHandlerImpl = func(arg interface{}) {
arg.(*Server).opts.useHandlerImpl = true
}
}
// testingCloseConns closes all existing transports but keeps s.lis
// accepting new connections.
func (s *Server) testingCloseConns() {
s.mu.Lock()
for c := range s.conns {
c.Close()
delete(s.conns, c)
}
s.mu.Unlock()
}
// SetHeader sets the header metadata.
// When called multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens: