forked from TrueCloudLab/frostfs-s3-gw
0161d2fbd3
- refactoring s3 gate structure - cleanup unused code - rename go module to `github.com/nspcc-dev/neofs-s3-gate` closes #13 Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
230 lines
5.4 KiB
Go
230 lines
5.4 KiB
Go
package metrics
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
type (
|
|
// HTTPAPIStats holds statistics information about
|
|
// a given API in the requests.
|
|
HTTPAPIStats struct {
|
|
apiStats map[string]int
|
|
sync.RWMutex
|
|
}
|
|
|
|
// HTTPStats holds statistics information about
|
|
// HTTP requests made by all clients
|
|
HTTPStats struct {
|
|
currentS3Requests HTTPAPIStats
|
|
totalS3Requests HTTPAPIStats
|
|
totalS3Errors HTTPAPIStats
|
|
|
|
totalInputBytes uint64
|
|
totalOutputBytes uint64
|
|
}
|
|
|
|
readCounter struct {
|
|
io.ReadCloser
|
|
countBytes uint64
|
|
}
|
|
|
|
writeCounter struct {
|
|
http.ResponseWriter
|
|
countBytes uint64
|
|
}
|
|
|
|
responseWrapper struct {
|
|
http.ResponseWriter
|
|
|
|
statusCode int
|
|
headWritten bool
|
|
startTime time.Time
|
|
}
|
|
)
|
|
|
|
const systemPath = "/system"
|
|
|
|
var (
|
|
httpStatsMetric = new(HTTPStats)
|
|
httpRequestsDuration = prometheus.NewHistogramVec(
|
|
prometheus.HistogramOpts{
|
|
Name: "neofs_s3_request_seconds",
|
|
Help: "Time taken by requests served by current NeoFS S3 Gate instance",
|
|
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10},
|
|
},
|
|
[]string{"api"},
|
|
)
|
|
)
|
|
|
|
// collects http metrics for NeoFS S3 Gate in Prometheus specific format
|
|
// and sends to given channel
|
|
func collectHTTPMetrics(ch chan<- prometheus.Metric) {
|
|
for api, value := range httpStatsMetric.currentS3Requests.Load() {
|
|
ch <- prometheus.MustNewConstMetric(
|
|
prometheus.NewDesc(
|
|
prometheus.BuildFQName("neofs_s3", "requests", "current"),
|
|
"Total number of running s3 requests in current MinIO server instance",
|
|
[]string{"api"}, nil),
|
|
prometheus.CounterValue,
|
|
float64(value),
|
|
api,
|
|
)
|
|
}
|
|
|
|
for api, value := range httpStatsMetric.totalS3Requests.Load() {
|
|
ch <- prometheus.MustNewConstMetric(
|
|
prometheus.NewDesc(
|
|
prometheus.BuildFQName("neofs_s3", "requests", "total"),
|
|
"Total number of s3 requests in current MinIO server instance",
|
|
[]string{"api"}, nil),
|
|
prometheus.CounterValue,
|
|
float64(value),
|
|
api,
|
|
)
|
|
}
|
|
|
|
for api, value := range httpStatsMetric.totalS3Errors.Load() {
|
|
ch <- prometheus.MustNewConstMetric(
|
|
prometheus.NewDesc(
|
|
prometheus.BuildFQName("neofs_s3", "errors", "total"),
|
|
"Total number of s3 errors in current MinIO server instance",
|
|
[]string{"api"}, nil),
|
|
prometheus.CounterValue,
|
|
float64(value),
|
|
api,
|
|
)
|
|
}
|
|
}
|
|
|
|
func APIStats(api string, f http.HandlerFunc) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
httpStatsMetric.currentS3Requests.Inc(api)
|
|
defer httpStatsMetric.currentS3Requests.Dec(api)
|
|
|
|
in := &readCounter{ReadCloser: r.Body}
|
|
out := &writeCounter{ResponseWriter: w}
|
|
|
|
r.Body = in
|
|
|
|
statsWriter := &responseWrapper{
|
|
ResponseWriter: out,
|
|
startTime: time.Now(),
|
|
}
|
|
|
|
f.ServeHTTP(statsWriter, r)
|
|
|
|
// Time duration in secs since the call started.
|
|
// We don't need to do nanosecond precision in this
|
|
// simply for the fact that it is not human readable.
|
|
durationSecs := time.Since(statsWriter.startTime).Seconds()
|
|
|
|
httpStatsMetric.updateStats(api, statsWriter, r, durationSecs)
|
|
|
|
atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes)
|
|
atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes)
|
|
}
|
|
}
|
|
|
|
// Inc increments the api stats counter.
|
|
func (stats *HTTPAPIStats) Inc(api string) {
|
|
if stats == nil {
|
|
return
|
|
}
|
|
stats.Lock()
|
|
defer stats.Unlock()
|
|
if stats.apiStats == nil {
|
|
stats.apiStats = make(map[string]int)
|
|
}
|
|
stats.apiStats[api]++
|
|
}
|
|
|
|
// Dec increments the api stats counter.
|
|
func (stats *HTTPAPIStats) Dec(api string) {
|
|
if stats == nil {
|
|
return
|
|
}
|
|
stats.Lock()
|
|
defer stats.Unlock()
|
|
if val, ok := stats.apiStats[api]; ok && val > 0 {
|
|
stats.apiStats[api]--
|
|
}
|
|
}
|
|
|
|
// Load returns the recorded stats.
|
|
func (stats *HTTPAPIStats) Load() map[string]int {
|
|
stats.Lock()
|
|
defer stats.Unlock()
|
|
var apiStats = make(map[string]int, len(stats.apiStats))
|
|
for k, v := range stats.apiStats {
|
|
apiStats[k] = v
|
|
}
|
|
return apiStats
|
|
}
|
|
|
|
func (st *HTTPStats) getInputBytes() uint64 {
|
|
return atomic.LoadUint64(&st.totalInputBytes)
|
|
}
|
|
|
|
func (st *HTTPStats) getOutputBytes() uint64 {
|
|
return atomic.LoadUint64(&st.totalOutputBytes)
|
|
}
|
|
|
|
// Update statistics from http request and response data
|
|
func (st *HTTPStats) updateStats(api string, w http.ResponseWriter, r *http.Request, durationSecs float64) {
|
|
var code int
|
|
|
|
if res, ok := w.(*responseWrapper); ok {
|
|
code = res.statusCode
|
|
}
|
|
|
|
// A successful request has a 2xx response code
|
|
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices
|
|
|
|
if !strings.HasSuffix(r.URL.Path, systemPath) {
|
|
st.totalS3Requests.Inc(api)
|
|
if !successReq && code != 0 {
|
|
st.totalS3Errors.Inc(api)
|
|
}
|
|
}
|
|
|
|
if r.Method == http.MethodGet {
|
|
// Increment the prometheus http request response histogram with appropriate label
|
|
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(durationSecs)
|
|
}
|
|
}
|
|
|
|
// WriteHeader - writes http status code
|
|
func (w *responseWrapper) WriteHeader(code int) {
|
|
if !w.headWritten {
|
|
w.statusCode = code
|
|
w.headWritten = true
|
|
|
|
w.ResponseWriter.WriteHeader(code)
|
|
}
|
|
}
|
|
|
|
// Flush - Calls the underlying Flush.
|
|
func (w *responseWrapper) Flush() {
|
|
if f, ok := w.ResponseWriter.(http.Flusher); ok {
|
|
f.Flush()
|
|
}
|
|
}
|
|
|
|
func (w *writeCounter) Write(p []byte) (int, error) {
|
|
n, err := w.ResponseWriter.Write(p)
|
|
atomic.AddUint64(&w.countBytes, uint64(n))
|
|
return n, err
|
|
}
|
|
|
|
func (r *readCounter) Read(p []byte) (int, error) {
|
|
n, err := r.ReadCloser.Read(p)
|
|
atomic.AddUint64(&r.countBytes, uint64(n))
|
|
return n, err
|
|
}
|