Merged in NFSSVC-24 (pull request #5)
NFSSVC-24 Metric isolation It now collects: * Info on invoked routes * Durations of requests * Tx/Rx bytes
This commit is contained in:
commit
e0a8e48672
4 changed files with 326 additions and 4 deletions
|
@ -10,6 +10,7 @@ import (
|
|||
minio "github.com/minio/minio/legacy"
|
||||
"github.com/minio/minio/legacy/config"
|
||||
"github.com/minio/minio/neofs/layer"
|
||||
"github.com/minio/minio/neofs/metrics"
|
||||
"github.com/minio/minio/neofs/pool"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
|
@ -193,8 +194,23 @@ func (a *App) Server(ctx context.Context) {
|
|||
attachMetrics(router, a.cfg, a.log)
|
||||
attachProfiler(router, a.cfg, a.log)
|
||||
|
||||
{ // Example for metrics.Middleware and metrics.APIStats
|
||||
r := router.PathPrefix("/test-metrics").Subrouter()
|
||||
r.Handle("/foo", metrics.APIStats("foo", func(w http.ResponseWriter, r *http.Request) {
|
||||
// do something
|
||||
}))
|
||||
|
||||
m := r.PathPrefix("/bar").Subrouter()
|
||||
m.Use(metrics.Middleware)
|
||||
m.Handle("", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// do something
|
||||
}))
|
||||
}
|
||||
|
||||
// Attach S3 API:
|
||||
minio.AttachS3API(router, a.obj, a.log)
|
||||
r := router.PathPrefix(minio.SlashSeparator).Subrouter()
|
||||
r.Use(metrics.Middleware)
|
||||
minio.AttachS3API(r, a.obj, a.log)
|
||||
|
||||
// Use mux.Router as http.Handler
|
||||
srv.Handler = router
|
||||
|
|
|
@ -43,9 +43,6 @@ func AttachS3API(r *mux.Router, obj ObjectLayer, l *zap.Logger) {
|
|||
// Add healthcheck router
|
||||
registerHealthCheckRouter(r)
|
||||
|
||||
// Add server metrics router
|
||||
registerMetricsRouter(r)
|
||||
|
||||
// Add API router.
|
||||
registerAPIRouter(r, true, true)
|
||||
|
||||
|
|
239
neofs/metrics/api.go
Normal file
239
neofs/metrics/api.go
Normal file
|
@ -0,0 +1,239 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type (
|
||||
// HTTPAPIStats holds statistics information about
|
||||
// a given API in the requests.
|
||||
HTTPAPIStats struct {
|
||||
apiStats map[string]int
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// HTTPStats holds statistics information about
|
||||
// HTTP requests made by all clients
|
||||
HTTPStats struct {
|
||||
currentS3Requests HTTPAPIStats
|
||||
totalS3Requests HTTPAPIStats
|
||||
totalS3Errors HTTPAPIStats
|
||||
|
||||
totalInputBytes uint64
|
||||
totalOutputBytes uint64
|
||||
}
|
||||
|
||||
readCounter struct {
|
||||
io.ReadCloser
|
||||
countBytes uint64
|
||||
}
|
||||
|
||||
writeCounter struct {
|
||||
http.ResponseWriter
|
||||
countBytes uint64
|
||||
}
|
||||
|
||||
responseWrapper struct {
|
||||
http.ResponseWriter
|
||||
|
||||
statusCode int
|
||||
headWritten bool
|
||||
startTime time.Time
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO: should be imported from routing
|
||||
systemPath = "/system"
|
||||
)
|
||||
|
||||
var (
|
||||
httpStatsMetric = new(HTTPStats)
|
||||
httpRequestsDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "neofs_s3_request_seconds",
|
||||
Help: "Time taken by requests served by current NeoFS S3 Gate instance",
|
||||
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10},
|
||||
},
|
||||
[]string{"api"},
|
||||
)
|
||||
)
|
||||
|
||||
// collects http metrics for NeoFS S3 Gate in Prometheus specific format
|
||||
// and sends to given channel
|
||||
func collectHTTPMetrics(ch chan<- prometheus.Metric) {
|
||||
for api, value := range httpStatsMetric.currentS3Requests.Load() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "requests", "current"),
|
||||
"Total number of running s3 requests in current MinIO server instance",
|
||||
[]string{"api"}, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(value),
|
||||
api,
|
||||
)
|
||||
}
|
||||
|
||||
for api, value := range httpStatsMetric.totalS3Requests.Load() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "requests", "total"),
|
||||
"Total number of s3 requests in current MinIO server instance",
|
||||
[]string{"api"}, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(value),
|
||||
api,
|
||||
)
|
||||
}
|
||||
|
||||
for api, value := range httpStatsMetric.totalS3Errors.Load() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "errors", "total"),
|
||||
"Total number of s3 errors in current MinIO server instance",
|
||||
[]string{"api"}, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(value),
|
||||
api,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func Middleware(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
APIStats(r.RequestURI, h.ServeHTTP).ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func APIStats(api string, f http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
httpStatsMetric.currentS3Requests.Inc(api)
|
||||
defer httpStatsMetric.currentS3Requests.Dec(api)
|
||||
|
||||
in := &readCounter{ReadCloser: r.Body}
|
||||
out := &writeCounter{ResponseWriter: w}
|
||||
|
||||
r.Body = in
|
||||
|
||||
statsWriter := &responseWrapper{
|
||||
ResponseWriter: out,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
|
||||
f.ServeHTTP(statsWriter, r)
|
||||
|
||||
// Time duration in secs since the call started.
|
||||
// We don't need to do nanosecond precision in this
|
||||
// simply for the fact that it is not human readable.
|
||||
durationSecs := time.Since(statsWriter.startTime).Seconds()
|
||||
|
||||
httpStatsMetric.updateStats(api, statsWriter, r, durationSecs)
|
||||
|
||||
atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes)
|
||||
atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes)
|
||||
}
|
||||
}
|
||||
|
||||
// Inc increments the api stats counter.
|
||||
func (stats *HTTPAPIStats) Inc(api string) {
|
||||
if stats == nil {
|
||||
return
|
||||
}
|
||||
stats.Lock()
|
||||
defer stats.Unlock()
|
||||
if stats.apiStats == nil {
|
||||
stats.apiStats = make(map[string]int)
|
||||
}
|
||||
stats.apiStats[api]++
|
||||
}
|
||||
|
||||
// Dec increments the api stats counter.
|
||||
func (stats *HTTPAPIStats) Dec(api string) {
|
||||
if stats == nil {
|
||||
return
|
||||
}
|
||||
stats.Lock()
|
||||
defer stats.Unlock()
|
||||
if val, ok := stats.apiStats[api]; ok && val > 0 {
|
||||
stats.apiStats[api]--
|
||||
}
|
||||
}
|
||||
|
||||
// Load returns the recorded stats.
|
||||
func (stats *HTTPAPIStats) Load() map[string]int {
|
||||
stats.Lock()
|
||||
defer stats.Unlock()
|
||||
var apiStats = make(map[string]int, len(stats.apiStats))
|
||||
for k, v := range stats.apiStats {
|
||||
apiStats[k] = v
|
||||
}
|
||||
return apiStats
|
||||
}
|
||||
|
||||
func (st *HTTPStats) getInputBytes() uint64 {
|
||||
return atomic.LoadUint64(&st.totalInputBytes)
|
||||
}
|
||||
|
||||
func (st *HTTPStats) getOutputBytes() uint64 {
|
||||
return atomic.LoadUint64(&st.totalOutputBytes)
|
||||
}
|
||||
|
||||
// Update statistics from http request and response data
|
||||
func (st *HTTPStats) updateStats(api string, w http.ResponseWriter, r *http.Request, durationSecs float64) {
|
||||
var code int
|
||||
|
||||
if res, ok := w.(*responseWrapper); ok {
|
||||
code = res.statusCode
|
||||
}
|
||||
|
||||
// A successful request has a 2xx response code
|
||||
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices
|
||||
|
||||
if !strings.HasSuffix(r.URL.Path, systemPath) {
|
||||
st.totalS3Requests.Inc(api)
|
||||
if !successReq && code != 0 {
|
||||
st.totalS3Errors.Inc(api)
|
||||
}
|
||||
}
|
||||
|
||||
if r.Method == http.MethodGet {
|
||||
// Increment the prometheus http request response histogram with appropriate label
|
||||
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(durationSecs)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteHeader - writes http status code
|
||||
func (w *responseWrapper) WriteHeader(code int) {
|
||||
if !w.headWritten {
|
||||
w.statusCode = code
|
||||
w.headWritten = true
|
||||
|
||||
w.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush - Calls the underlying Flush.
|
||||
func (w *responseWrapper) Flush() {
|
||||
if f, ok := w.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writeCounter) Write(p []byte) (int, error) {
|
||||
n, err := w.ResponseWriter.Write(p)
|
||||
atomic.AddUint64(&w.countBytes, uint64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *readCounter) Read(p []byte) (int, error) {
|
||||
n, err := r.ReadCloser.Read(p)
|
||||
atomic.AddUint64(&r.countBytes, uint64(n))
|
||||
return n, err
|
||||
}
|
70
neofs/metrics/collector.go
Normal file
70
neofs/metrics/collector.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/minio/minio/misc"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type stats struct {
|
||||
desc *prometheus.Desc
|
||||
}
|
||||
|
||||
var (
|
||||
versionInfo = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "neofs_s3",
|
||||
Name: "version_info",
|
||||
Help: "Version of current NeoFS S3 Gate instance",
|
||||
},
|
||||
[]string{
|
||||
// current version
|
||||
"version",
|
||||
// build time of the current version
|
||||
"build_time",
|
||||
},
|
||||
)
|
||||
|
||||
statsMetrics = &stats{
|
||||
desc: prometheus.NewDesc("neofs_s3_stats", "Statistics exposed by MinIO server", nil, nil),
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(versionInfo)
|
||||
prometheus.MustRegister(statsMetrics)
|
||||
prometheus.MustRegister(httpRequestsDuration)
|
||||
}
|
||||
|
||||
func collectNetworkMetrics(ch chan<- prometheus.Metric) {
|
||||
// Network Sent/Received Bytes (Outbound)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "tx", "bytes_total"),
|
||||
"Total number of bytes sent by current NeoFS S3 Gate instance",
|
||||
nil, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(httpStatsMetric.getInputBytes()),
|
||||
)
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "rx", "bytes_total"),
|
||||
"Total number of bytes received by current NeoFS S3 Gate instance",
|
||||
nil, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(httpStatsMetric.getOutputBytes()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *stats) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- s.desc
|
||||
}
|
||||
|
||||
func (s *stats) Collect(ch chan<- prometheus.Metric) {
|
||||
// Expose current version information
|
||||
versionInfo.WithLabelValues(misc.Version, misc.Build).Set(1.0)
|
||||
|
||||
// connect collectors
|
||||
collectHTTPMetrics(ch)
|
||||
collectNetworkMetrics(ch)
|
||||
}
|
Loading…
Reference in a new issue