From b9cc4acb99e158514ff7ecc1cf73a09b0182727a Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Thu, 16 Jul 2020 15:42:06 +0300 Subject: [PATCH] NFSSVC-24 Migrate and refactoring metrics - Request time duration - Collect API/Network stats --- cmd/gate/app.go | 18 ++- neofs/metrics/api.go | 239 +++++++++++++++++++++++++++++++++++++ neofs/metrics/collector.go | 70 +++++++++++ 3 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 neofs/metrics/api.go create mode 100644 neofs/metrics/collector.go diff --git a/cmd/gate/app.go b/cmd/gate/app.go index a5c4021..151ab1c 100644 --- a/cmd/gate/app.go +++ b/cmd/gate/app.go @@ -10,6 +10,7 @@ import ( minio "github.com/minio/minio/legacy" "github.com/minio/minio/legacy/config" "github.com/minio/minio/neofs/layer" + "github.com/minio/minio/neofs/metrics" "github.com/minio/minio/neofs/pool" "github.com/minio/minio/pkg/auth" "github.com/nspcc-dev/neofs-api-go/refs" @@ -193,8 +194,23 @@ func (a *App) Server(ctx context.Context) { attachMetrics(router, a.cfg, a.log) attachProfiler(router, a.cfg, a.log) + { // Example for metrics.Middleware and metrics.APIStats + r := router.PathPrefix("/test-metrics").Subrouter() + r.Handle("/foo", metrics.APIStats("foo", func(w http.ResponseWriter, r *http.Request) { + // do something + })) + + m := r.PathPrefix("/bar").Subrouter() + m.Use(metrics.Middleware) + m.Handle("", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // do something + })) + } + // Attach S3 API: - minio.AttachS3API(router, a.obj, a.log) + r := router.PathPrefix(minio.SlashSeparator).Subrouter() + r.Use(metrics.Middleware) + minio.AttachS3API(r, a.obj, a.log) // Use mux.Router as http.Handler srv.Handler = router diff --git a/neofs/metrics/api.go b/neofs/metrics/api.go new file mode 100644 index 0000000..7e96771 --- /dev/null +++ b/neofs/metrics/api.go @@ -0,0 +1,239 @@ +package metrics + +import ( + "io" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type ( + // HTTPAPIStats holds statistics information about + // a given API in the requests. + HTTPAPIStats struct { + apiStats map[string]int + sync.RWMutex + } + + // HTTPStats holds statistics information about + // HTTP requests made by all clients + HTTPStats struct { + currentS3Requests HTTPAPIStats + totalS3Requests HTTPAPIStats + totalS3Errors HTTPAPIStats + + totalInputBytes uint64 + totalOutputBytes uint64 + } + + readCounter struct { + io.ReadCloser + countBytes uint64 + } + + writeCounter struct { + http.ResponseWriter + countBytes uint64 + } + + responseWrapper struct { + http.ResponseWriter + + statusCode int + headWritten bool + startTime time.Time + } +) + +const ( + // TODO: should be imported from routing + systemPath = "/system" +) + +var ( + httpStatsMetric = new(HTTPStats) + httpRequestsDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "neofs_s3_request_seconds", + Help: "Time taken by requests served by current NeoFS S3 Gate instance", + Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10}, + }, + []string{"api"}, + ) +) + +// collects http metrics for NeoFS S3 Gate in Prometheus specific format +// and sends to given channel +func collectHTTPMetrics(ch chan<- prometheus.Metric) { + for api, value := range httpStatsMetric.currentS3Requests.Load() { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("neofs_s3", "requests", "current"), + "Total number of running s3 requests in current MinIO server instance", + []string{"api"}, nil), + prometheus.CounterValue, + float64(value), + api, + ) + } + + for api, value := range httpStatsMetric.totalS3Requests.Load() { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("neofs_s3", "requests", "total"), + "Total number of s3 requests in current MinIO server instance", + []string{"api"}, nil), + prometheus.CounterValue, + float64(value), + api, + ) + } + + for api, value := range httpStatsMetric.totalS3Errors.Load() { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("neofs_s3", "errors", "total"), + "Total number of s3 errors in current MinIO server instance", + []string{"api"}, nil), + prometheus.CounterValue, + float64(value), + api, + ) + } +} + +func Middleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + APIStats(r.RequestURI, h.ServeHTTP).ServeHTTP(w, r) + }) +} + +func APIStats(api string, f http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + httpStatsMetric.currentS3Requests.Inc(api) + defer httpStatsMetric.currentS3Requests.Dec(api) + + in := &readCounter{ReadCloser: r.Body} + out := &writeCounter{ResponseWriter: w} + + r.Body = in + + statsWriter := &responseWrapper{ + ResponseWriter: out, + startTime: time.Now(), + } + + f.ServeHTTP(statsWriter, r) + + // Time duration in secs since the call started. + // We don't need to do nanosecond precision in this + // simply for the fact that it is not human readable. + durationSecs := time.Since(statsWriter.startTime).Seconds() + + httpStatsMetric.updateStats(api, statsWriter, r, durationSecs) + + atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes) + atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes) + } +} + +// Inc increments the api stats counter. +func (stats *HTTPAPIStats) Inc(api string) { + if stats == nil { + return + } + stats.Lock() + defer stats.Unlock() + if stats.apiStats == nil { + stats.apiStats = make(map[string]int) + } + stats.apiStats[api]++ +} + +// Dec increments the api stats counter. +func (stats *HTTPAPIStats) Dec(api string) { + if stats == nil { + return + } + stats.Lock() + defer stats.Unlock() + if val, ok := stats.apiStats[api]; ok && val > 0 { + stats.apiStats[api]-- + } +} + +// Load returns the recorded stats. +func (stats *HTTPAPIStats) Load() map[string]int { + stats.Lock() + defer stats.Unlock() + var apiStats = make(map[string]int, len(stats.apiStats)) + for k, v := range stats.apiStats { + apiStats[k] = v + } + return apiStats +} + +func (st *HTTPStats) getInputBytes() uint64 { + return atomic.LoadUint64(&st.totalInputBytes) +} + +func (st *HTTPStats) getOutputBytes() uint64 { + return atomic.LoadUint64(&st.totalOutputBytes) +} + +// Update statistics from http request and response data +func (st *HTTPStats) updateStats(api string, w http.ResponseWriter, r *http.Request, durationSecs float64) { + var code int + + if res, ok := w.(*responseWrapper); ok { + code = res.statusCode + } + + // A successful request has a 2xx response code + successReq := code >= http.StatusOK && code < http.StatusMultipleChoices + + if !strings.HasSuffix(r.URL.Path, systemPath) { + st.totalS3Requests.Inc(api) + if !successReq && code != 0 { + st.totalS3Errors.Inc(api) + } + } + + if r.Method == http.MethodGet { + // Increment the prometheus http request response histogram with appropriate label + httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(durationSecs) + } +} + +// WriteHeader - writes http status code +func (w *responseWrapper) WriteHeader(code int) { + if !w.headWritten { + w.statusCode = code + w.headWritten = true + + w.ResponseWriter.WriteHeader(code) + } +} + +// Flush - Calls the underlying Flush. +func (w *responseWrapper) Flush() { + if f, ok := w.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + +func (w *writeCounter) Write(p []byte) (int, error) { + n, err := w.ResponseWriter.Write(p) + atomic.AddUint64(&w.countBytes, uint64(n)) + return n, err +} + +func (r *readCounter) Read(p []byte) (int, error) { + n, err := r.ReadCloser.Read(p) + atomic.AddUint64(&r.countBytes, uint64(n)) + return n, err +} diff --git a/neofs/metrics/collector.go b/neofs/metrics/collector.go new file mode 100644 index 0000000..ec45412 --- /dev/null +++ b/neofs/metrics/collector.go @@ -0,0 +1,70 @@ +package metrics + +import ( + "github.com/minio/minio/misc" + "github.com/prometheus/client_golang/prometheus" +) + +type stats struct { + desc *prometheus.Desc +} + +var ( + versionInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "neofs_s3", + Name: "version_info", + Help: "Version of current NeoFS S3 Gate instance", + }, + []string{ + // current version + "version", + // build time of the current version + "build_time", + }, + ) + + statsMetrics = &stats{ + desc: prometheus.NewDesc("neofs_s3_stats", "Statistics exposed by MinIO server", nil, nil), + } +) + +func init() { + prometheus.MustRegister(versionInfo) + prometheus.MustRegister(statsMetrics) + prometheus.MustRegister(httpRequestsDuration) +} + +func collectNetworkMetrics(ch chan<- prometheus.Metric) { + // Network Sent/Received Bytes (Outbound) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("neofs_s3", "tx", "bytes_total"), + "Total number of bytes sent by current NeoFS S3 Gate instance", + nil, nil), + prometheus.CounterValue, + float64(httpStatsMetric.getInputBytes()), + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("neofs_s3", "rx", "bytes_total"), + "Total number of bytes received by current NeoFS S3 Gate instance", + nil, nil), + prometheus.CounterValue, + float64(httpStatsMetric.getOutputBytes()), + ) +} + +func (s *stats) Describe(ch chan<- *prometheus.Desc) { + ch <- s.desc +} + +func (s *stats) Collect(ch chan<- prometheus.Metric) { + // Expose current version information + versionInfo.WithLabelValues(misc.Version, misc.Build).Set(1.0) + + // connect collectors + collectHTTPMetrics(ch) + collectNetworkMetrics(ch) +}