parent
3ef9f6f016
commit
8ec57d145e
267 changed files with 15430 additions and 10511 deletions
39
vendor/google.golang.org/grpc/stream.go
generated
vendored
39
vendor/google.golang.org/grpc/stream.go
generated
vendored
|
@ -151,23 +151,24 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||
}
|
||||
}()
|
||||
}
|
||||
if stats.On() {
|
||||
ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
|
||||
sh := cc.dopts.copts.StatsHandler
|
||||
if sh != nil {
|
||||
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
|
||||
begin := &stats.Begin{
|
||||
Client: true,
|
||||
BeginTime: time.Now(),
|
||||
FailFast: c.failFast,
|
||||
}
|
||||
stats.HandleRPC(ctx, begin)
|
||||
sh.HandleRPC(ctx, begin)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil && stats.On() {
|
||||
if err != nil && sh != nil {
|
||||
// Only handle end stats if err != nil.
|
||||
end := &stats.End{
|
||||
Client: true,
|
||||
Error: err,
|
||||
}
|
||||
stats.HandleRPC(ctx, end)
|
||||
sh.HandleRPC(ctx, end)
|
||||
}
|
||||
}()
|
||||
gopts := BalancerGetOptions{
|
||||
|
@ -223,7 +224,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||
tracing: EnableTracing,
|
||||
trInfo: trInfo,
|
||||
|
||||
statsCtx: ctx,
|
||||
statsCtx: ctx,
|
||||
statsHandler: cc.dopts.copts.StatsHandler,
|
||||
}
|
||||
if cc.dopts.cp != nil {
|
||||
cs.cbuf = new(bytes.Buffer)
|
||||
|
@ -281,7 +283,8 @@ type clientStream struct {
|
|||
// statsCtx keeps the user context for stats handling.
|
||||
// All stats collection should use the statsCtx (instead of the stream context)
|
||||
// so that all the generated stats for a particular RPC can be associated in the processing phase.
|
||||
statsCtx context.Context
|
||||
statsCtx context.Context
|
||||
statsHandler stats.Handler
|
||||
}
|
||||
|
||||
func (cs *clientStream) Context() context.Context {
|
||||
|
@ -335,7 +338,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||
err = toRPCErr(err)
|
||||
}()
|
||||
var outPayload *stats.OutPayload
|
||||
if stats.On() {
|
||||
if cs.statsHandler != nil {
|
||||
outPayload = &stats.OutPayload{
|
||||
Client: true,
|
||||
}
|
||||
|
@ -352,14 +355,14 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||
err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
|
||||
if err == nil && outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
stats.HandleRPC(cs.statsCtx, outPayload)
|
||||
cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
||||
defer func() {
|
||||
if err != nil && stats.On() {
|
||||
if err != nil && cs.statsHandler != nil {
|
||||
// Only generate End if err != nil.
|
||||
// If err == nil, it's not the last RecvMsg.
|
||||
// The last RecvMsg gets either an RPC error or io.EOF.
|
||||
|
@ -370,11 +373,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
|||
if err != io.EOF {
|
||||
end.Error = toRPCErr(err)
|
||||
}
|
||||
stats.HandleRPC(cs.statsCtx, end)
|
||||
cs.statsHandler.HandleRPC(cs.statsCtx, end)
|
||||
}
|
||||
}()
|
||||
var inPayload *stats.InPayload
|
||||
if stats.On() {
|
||||
if cs.statsHandler != nil {
|
||||
inPayload = &stats.InPayload{
|
||||
Client: true,
|
||||
}
|
||||
|
@ -395,7 +398,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
|||
cs.mu.Unlock()
|
||||
}
|
||||
if inPayload != nil {
|
||||
stats.HandleRPC(cs.statsCtx, inPayload)
|
||||
cs.statsHandler.HandleRPC(cs.statsCtx, inPayload)
|
||||
}
|
||||
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
|
||||
return
|
||||
|
@ -520,6 +523,8 @@ type serverStream struct {
|
|||
statusDesc string
|
||||
trInfo *traceInfo
|
||||
|
||||
statsHandler stats.Handler
|
||||
|
||||
mu sync.Mutex // protects trInfo.tr after the service handler runs.
|
||||
}
|
||||
|
||||
|
@ -562,7 +567,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|||
}
|
||||
}()
|
||||
var outPayload *stats.OutPayload
|
||||
if stats.On() {
|
||||
if ss.statsHandler != nil {
|
||||
outPayload = &stats.OutPayload{}
|
||||
}
|
||||
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
|
||||
|
@ -580,7 +585,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|||
}
|
||||
if outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
stats.HandleRPC(ss.s.Context(), outPayload)
|
||||
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -601,7 +606,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
|||
}
|
||||
}()
|
||||
var inPayload *stats.InPayload
|
||||
if stats.On() {
|
||||
if ss.statsHandler != nil {
|
||||
inPayload = &stats.InPayload{}
|
||||
}
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
|
||||
|
@ -614,7 +619,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
|||
return toRPCErr(err)
|
||||
}
|
||||
if inPayload != nil {
|
||||
stats.HandleRPC(ss.s.Context(), inPayload)
|
||||
ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue