From 62c6bbd8758fce7653026f9da746ae9ca6ce8804 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 25 Feb 2020 18:35:46 +0300 Subject: [PATCH] refactoring pool, replace zap.Duration with zap.Stringer --- main.go | 16 +++++++++++++++- pool.go | 26 +++++++++++++++++--------- receive.go | 21 +++++++++++++++------ 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 1e33ea5..887425d 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,9 @@ type router struct { func main() { var ( + err error + pool *Pool + v = settings() l = newLogger(v) g = newGracefulContext(l) @@ -32,10 +35,21 @@ func main() { grpclog.SetLoggerV2(gRPCLogger(l)) } + switch pool, err = newPool(g, l, v); { + case err == nil: + // ignore + case errors.Is(err, context.Canceled): + l.Info("close application") + return + default: + l.Error("could get connection", zap.Error(err)) + return + } + r := &router{ log: l, + pool: pool, key: fetchKey(l, v), - pool: newPool(g, l, v), timeout: v.GetDuration("request_timeout"), } diff --git a/pool.go b/pool.go index 0395cd4..979fd93 100644 --- a/pool.go +++ b/pool.go @@ -54,7 +54,7 @@ var ( errNoHealthyConnections = errors.New("no active connections") ) -func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { +func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) (*Pool, error) { p := &Pool{ log: l, Mutex: new(sync.Mutex), @@ -128,11 +128,9 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { p.reBalance(ctx) - if _, err := p.getConnection(ctx); err != nil { - l.Panic("could get connection", zap.Error(err)) - } + _, err := p.getConnection(ctx) - return p + return p, err } func (p *Pool) close() { @@ -168,6 +166,12 @@ func (p *Pool) reBalance(ctx context.Context) { weight = p.nodes[i].weight ) + if ctx.Err() != nil { + p.log.Warn("something went wrong", zap.Error(ctx.Err())) + + return + } + if conn == nil { p.log.Warn("empty connection, try to connect", zap.String("address", p.nodes[i].address)) @@ -182,7 +186,7 @@ func (p *Pool) reBalance(ctx context.Context) { if err != nil || conn == nil { p.log.Warn("skip, could not connect to node", zap.String("address", p.nodes[i].address), - zap.Duration("elapsed", time.Since(start)), + zap.Stringer("elapsed", time.Since(start)), zap.Error(err)) continue } @@ -206,7 +210,7 @@ func (p *Pool) reBalance(ctx context.Context) { if err = p.isAlive(ctx, conn); err != nil || usedAt > p.ttl { p.log.Warn("connection not alive", zap.String("address", p.nodes[i].address), - zap.Duration("used_at", usedAt), + zap.Stringer("used_at", usedAt), zap.Error(err)) if exists { @@ -217,7 +221,7 @@ func (p *Pool) reBalance(ctx context.Context) { if err = conn.Close(); err != nil { p.log.Warn("could not close bad connection", zap.String("address", p.nodes[i].address), - zap.Duration("used_at", usedAt), + zap.Stringer("used_at", usedAt), zap.Error(err)) } @@ -231,7 +235,7 @@ func (p *Pool) reBalance(ctx context.Context) { p.log.Info("connection alive", zap.String("address", p.nodes[i].address), - zap.Duration("used_at", usedAt)) + zap.Stringer("used_at", usedAt)) if !exists { p.conns[weight] = append(p.conns[weight], p.nodes[i]) @@ -281,6 +285,10 @@ func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) { p.currentConn = nil p.currentIdx.Store(-1) + if ctx.Err() != nil { + return nil, ctx.Err() + } + return nil, errNoHealthyConnections } diff --git a/receive.go b/receive.go index 7607d00..453f23e 100644 --- a/receive.go +++ b/receive.go @@ -28,6 +28,7 @@ func (r *router) receiveFile(c echo.Context) error { start = time.Now() con *grpc.ClientConn ctx = c.Request().Context() + cli object.Service_GetClient download = c.QueryParam("download") != "" ) @@ -36,14 +37,14 @@ func (r *router) receiveFile(c echo.Context) error { zap.String("cid", c.Param("cid")), zap.String("oid", c.Param("oid"))) - if err := cid.Parse(c.Param("cid")); err != nil { + if err = cid.Parse(c.Param("cid")); err != nil { log.Error("wrong container id", zap.Error(err)) return echo.NewHTTPError( http.StatusBadRequest, errors.Wrap(err, "wrong container id").Error(), ) - } else if err := oid.Parse(c.Param("oid")); err != nil { + } else if err = oid.Parse(c.Param("oid")); err != nil { log.Error("wrong object id", zap.Error(err)) return echo.NewHTTPError( @@ -57,6 +58,7 @@ func (r *router) receiveFile(c echo.Context) error { defer cancel() if con, err = r.pool.getConnection(ctx); err != nil { + log.Error("getConnection timeout", zap.Error(err)) return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } } @@ -66,18 +68,25 @@ func (r *router) receiveFile(c echo.Context) error { log = log.With(zap.String("node", con.Target())) + defer func() { + if err != nil { + return + } + + log.Error("object sent to client", zap.Stringer("elapsed", time.Since(start))) + }() + req := &object.GetRequest{Address: refs.Address{ObjectID: oid, CID: cid}} req.SetTTL(service.SingleForwardingTTL) - if err := service.SignRequestHeader(r.key, req); err != nil { + if err = service.SignRequestHeader(r.key, req); err != nil { log.Error("could not sign request", zap.Error(err)) return echo.NewHTTPError( http.StatusBadRequest, errors.Wrap(err, "could not sign request").Error()) } - cli, err := object.NewServiceClient(con).Get(ctx, req) - if err != nil { + if cli, err = object.NewServiceClient(con).Get(ctx, req); err != nil { log.Error("could not prepare connection", zap.Error(err)) return echo.NewHTTPError( @@ -86,7 +95,7 @@ func (r *router) receiveFile(c echo.Context) error { ) } else if obj, err = receiveObject(cli); err != nil { log.Error("could not receive object", - zap.Duration("elapsed", time.Since(start)), + zap.Stringer("elapsed", time.Since(start)), zap.Error(err)) switch {