[#80] Refactor metrics, support dump descriptions

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-04-07 17:28:21 +03:00
parent 70ec5a0a5b
commit 9e72fe1662
14 changed files with 760 additions and 485 deletions

View file

@ -7,6 +7,8 @@ GO_VERSION ?= 1.19
LINT_VERSION ?= 1.49.0
BINDIR = bin
METRICS_DUMP_OUT ?= ./metrics-dump.json
# Binaries to build
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
BINS = $(addprefix $(BINDIR)/, $(CMDS))
@ -153,4 +155,9 @@ debpackage:
debclean:
dh clean
# Dump metrics (use METRICS_DUMP_OUT variable to override default out file './metrics-dump.json')
.PHONY: dump-metrics
dump-metrics:
@go test ./metrics -run TestDescribeAll --tags=dump_metrics --out=$(abspath $(METRICS_DUMP_OUT))
include help.mk

View file

@ -1,305 +0,0 @@
package api
import (
"context"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"github.com/prometheus/client_golang/prometheus"
)
type RequestType int
const (
UNKNOWNRequest RequestType = iota
HEADRequest RequestType = iota
PUTRequest RequestType = iota
LISTRequest RequestType = iota
GETRequest RequestType = iota
DELETERequest RequestType = iota
)
func (t RequestType) String() string {
switch t {
case 1:
return "HEAD"
case 2:
return "PUT"
case 3:
return "LIST"
case 4:
return "GET"
case 5:
return "DELETE"
default:
return "Unknown"
}
}
func RequestTypeFromAPI(api string) RequestType {
switch api {
case "Options", "HeadObject", "HeadBucket":
return HEADRequest
case "CreateMultipartUpload", "UploadPartCopy", "UploadPart", "CompleteMultipartUpload",
"PutObjectACL", "PutObjectTagging", "CopyObject", "PutObjectRetention", "PutObjectLegalHold",
"PutObject", "PutBucketCors", "PutBucketACL", "PutBucketLifecycle", "PutBucketEncryption",
"PutBucketPolicy", "PutBucketObjectLockConfig", "PutBucketTagging", "PutBucketVersioning",
"PutBucketNotification", "CreateBucket", "PostObject":
return PUTRequest
case "ListObjectParts", "ListMultipartUploads", "ListObjectsV2M", "ListObjectsV2", "ListBucketVersions",
"ListObjectsV1", "ListBuckets":
return LISTRequest
case "GetObjectACL", "GetObjectTagging", "SelectObjectContent", "GetObjectRetention", "getobjectlegalhold",
"GetObjectAttributes", "GetObject", "GetBucketLocation", "GetBucketPolicy",
"GetBucketLifecycle", "GetBucketEncryption", "GetBucketCors", "GetBucketACL",
"GetBucketWebsite", "GetBucketAccelerate", "GetBucketRequestPayment", "GetBucketLogging",
"GetBucketReplication", "GetBucketTagging", "GetBucketObjectLockConfig",
"GetBucketVersioning", "GetBucketNotification", "ListenBucketNotification":
return GETRequest
case "AbortMultipartUpload", "DeleteObjectTagging", "DeleteObject", "DeleteBucketCors",
"DeleteBucketWebsite", "DeleteBucketTagging", "DeleteMultipleObjects", "DeleteBucketPolicy",
"DeleteBucketLifecycle", "DeleteBucketEncryption", "DeleteBucket":
return DELETERequest
default:
return UNKNOWNRequest
}
}
type (
// HTTPAPIStats holds statistics information about
// the API given in the requests.
HTTPAPIStats struct {
apiStats map[string]int
sync.RWMutex
}
UsersStat interface {
Update(user, bucket, cnrID string, reqType RequestType, in, out uint64)
}
// HTTPStats holds statistics information about
// HTTP requests made by all clients.
HTTPStats struct {
currentS3Requests HTTPAPIStats
totalS3Requests HTTPAPIStats
totalS3Errors HTTPAPIStats
totalInputBytes uint64
totalOutputBytes uint64
}
readCounter struct {
io.ReadCloser
countBytes uint64
}
writeCounter struct {
http.ResponseWriter
countBytes uint64
}
responseWrapper struct {
sync.Once
http.ResponseWriter
statusCode int
startTime time.Time
}
)
const systemPath = "/system"
var (
httpStatsMetric = new(HTTPStats)
httpRequestsDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "frostfs_s3_request_seconds",
Help: "Time taken by requests served by current FrostFS S3 Gate instance",
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"api"},
)
)
// Collects HTTP metrics for FrostFS S3 Gate in Prometheus specific format
// and sends to the given channel.
func collectHTTPMetrics(ch chan<- prometheus.Metric) {
for api, value := range httpStatsMetric.currentS3Requests.Load() {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "requests", "current"),
"Total number of running s3 requests in current FrostFS S3 Gate instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
for api, value := range httpStatsMetric.totalS3Requests.Load() {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "requests", "total"),
"Total number of s3 requests in current FrostFS S3 Gate instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
for api, value := range httpStatsMetric.totalS3Errors.Load() {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "errors", "total"),
"Total number of s3 errors in current FrostFS S3 Gate instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
}
// CIDResolveFunc is a func to resolve CID in Stats handler.
type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string)
// Stats is a handler that update metrics.
func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc, usersStat UsersStat) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
reqInfo := GetReqInfo(r.Context())
httpStatsMetric.currentS3Requests.Inc(reqInfo.API)
defer httpStatsMetric.currentS3Requests.Dec(reqInfo.API)
in := &readCounter{ReadCloser: r.Body}
out := &writeCounter{ResponseWriter: w}
r.Body = in
statsWriter := &responseWrapper{
ResponseWriter: out,
startTime: time.Now(),
}
f(statsWriter, r)
// Time duration in secs since the call started.
// We don't need to do nanosecond precision here
// simply for the fact that it is not human-readable.
durationSecs := time.Since(statsWriter.startTime).Seconds()
user := resolveUser(r.Context())
cnrID := resolveCID(r.Context(), reqInfo)
usersStat.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
code := statsWriter.statusCode
// A successful request has a 2xx response code
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices
if !strings.HasSuffix(r.URL.Path, systemPath) {
httpStatsMetric.totalS3Requests.Inc(reqInfo.API)
if !successReq && code != 0 {
httpStatsMetric.totalS3Errors.Inc(reqInfo.API)
}
}
if r.Method == http.MethodGet {
// Increment the prometheus http request response histogram with appropriate label
httpRequestsDuration.With(prometheus.Labels{"api": reqInfo.API}).Observe(durationSecs)
}
atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes)
atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes)
}
}
func resolveUser(ctx context.Context) string {
user := "anon"
if bd, ok := ctx.Value(BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
user = bearer.ResolveIssuer(*bd.Gate.BearerToken).String()
}
return user
}
// Inc increments the api stats counter.
func (stats *HTTPAPIStats) Inc(api string) {
if stats == nil {
return
}
stats.Lock()
defer stats.Unlock()
if stats.apiStats == nil {
stats.apiStats = make(map[string]int)
}
stats.apiStats[api]++
}
// Dec increments the api stats counter.
func (stats *HTTPAPIStats) Dec(api string) {
if stats == nil {
return
}
stats.Lock()
defer stats.Unlock()
if val, ok := stats.apiStats[api]; ok && val > 0 {
stats.apiStats[api]--
}
}
// Load returns the recorded stats.
func (stats *HTTPAPIStats) Load() map[string]int {
stats.Lock()
defer stats.Unlock()
var apiStats = make(map[string]int, len(stats.apiStats))
for k, v := range stats.apiStats {
apiStats[k] = v
}
return apiStats
}
func (st *HTTPStats) getInputBytes() uint64 {
return atomic.LoadUint64(&st.totalInputBytes)
}
func (st *HTTPStats) getOutputBytes() uint64 {
return atomic.LoadUint64(&st.totalOutputBytes)
}
// WriteHeader -- writes http status code.
func (w *responseWrapper) WriteHeader(code int) {
w.Do(func() {
w.statusCode = code
w.ResponseWriter.WriteHeader(code)
})
}
// Flush -- calls the underlying Flush.
func (w *responseWrapper) Flush() {
if f, ok := w.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
func (w *writeCounter) Flush() {
if f, ok := w.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
func (w *writeCounter) Write(p []byte) (int, error) {
n, err := w.ResponseWriter.Write(p)
atomic.AddUint64(&w.countBytes, uint64(n))
return n, err
}
func (r *readCounter) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
atomic.AddUint64(&r.countBytes, uint64(n))
return n, err
}

View file

@ -1,68 +0,0 @@
package api
import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"github.com/prometheus/client_golang/prometheus"
)
type stats struct {
desc *prometheus.Desc
}
var (
versionInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "frostfs_s3",
Name: "version_info",
Help: "Version of current FrostFS S3 Gate instance",
},
[]string{
// current version
"version",
},
)
statsMetrics = &stats{
desc: prometheus.NewDesc("frostfs_s3_stats", "Statistics exposed by FrostFS S3 Gate instance", nil, nil),
}
)
func init() {
prometheus.MustRegister(versionInfo)
prometheus.MustRegister(statsMetrics)
prometheus.MustRegister(httpRequestsDuration)
}
func collectNetworkMetrics(ch chan<- prometheus.Metric) {
// Network Sent/Received Bytes (Outbound)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "tx", "bytes_total"),
"Total number of bytes sent by current FrostFS S3 Gate instance",
nil, nil),
prometheus.CounterValue,
float64(httpStatsMetric.getInputBytes()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "rx", "bytes_total"),
"Total number of bytes received by current FrostFS S3 Gate instance",
nil, nil),
prometheus.CounterValue,
float64(httpStatsMetric.getOutputBytes()),
)
}
func (s *stats) Describe(ch chan<- *prometheus.Desc) {
ch <- s.desc
}
func (s *stats) Collect(ch chan<- prometheus.Metric) {
// Expose current version information
versionInfo.WithLabelValues(version.Version).Set(1.0)
// connect collectors
collectHTTPMetrics(ch)
collectNetworkMetrics(ch)
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
"github.com/google/uuid"
"github.com/gorilla/mux"
"go.uber.org/zap"
@ -157,9 +158,9 @@ func appendCORS(handler Handler) mux.MiddlewareFunc {
type BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error)
// metricsMiddleware wraps http handler for api with basic statistics collection.
func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc, usersStat UsersStat) mux.MiddlewareFunc {
func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc, appMetrics *metrics.AppMetrics) mux.MiddlewareFunc {
return func(h http.Handler) http.Handler {
return Stats(h.ServeHTTP, resolveCID(log, resolveBucket), usersStat)
return Stats(h.ServeHTTP, resolveCID(log, resolveBucket), appMetrics)
}
}
@ -229,10 +230,10 @@ func setErrorAPI(apiName string, h http.Handler) http.Handler {
}
// attachErrorHandler set NotFoundHandler and MethodNotAllowedHandler for mux.Router.
func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center, usersStat UsersStat) {
func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center, appMetrics *metrics.AppMetrics) {
middlewares := []mux.MiddlewareFunc{
AuthMiddleware(log, center),
metricsMiddleware(log, h.ResolveBucket, usersStat),
metricsMiddleware(log, h.ResolveBucket, appMetrics),
}
var errorHandler http.Handler = http.HandlerFunc(errorResponseHandler)
@ -247,7 +248,7 @@ func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth
// Attach adds S3 API handlers from h to r for domains with m client limit using
// center authentication and log logger.
func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger, usersStat UsersStat) {
func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger, appMetrics *metrics.AppMetrics) {
api := r.PathPrefix(SlashSeparator).Subrouter()
api.Use(
@ -257,13 +258,13 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
// Attach user authentication for all S3 routes.
AuthMiddleware(log, center),
metricsMiddleware(log, h.ResolveBucket, usersStat),
metricsMiddleware(log, h.ResolveBucket, appMetrics),
// -- logging error requests
logSuccessResponse(log),
)
attachErrorHandler(api, log, h, center, usersStat)
attachErrorHandler(api, log, h, center, appMetrics)
buckets := make([]*mux.Router, 0, len(domains)+1)
buckets = append(buckets, api.PathPrefix("/{bucket}").Subrouter())

165
api/stats.go Normal file
View file

@ -0,0 +1,165 @@
package api
import (
"context"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
)
func RequestTypeFromAPI(api string) metrics.RequestType {
switch api {
case "Options", "HeadObject", "HeadBucket":
return metrics.HEADRequest
case "CreateMultipartUpload", "UploadPartCopy", "UploadPart", "CompleteMultipartUpload",
"PutObjectACL", "PutObjectTagging", "CopyObject", "PutObjectRetention", "PutObjectLegalHold",
"PutObject", "PutBucketCors", "PutBucketACL", "PutBucketLifecycle", "PutBucketEncryption",
"PutBucketPolicy", "PutBucketObjectLockConfig", "PutBucketTagging", "PutBucketVersioning",
"PutBucketNotification", "CreateBucket", "PostObject":
return metrics.PUTRequest
case "ListObjectParts", "ListMultipartUploads", "ListObjectsV2M", "ListObjectsV2", "ListBucketVersions",
"ListObjectsV1", "ListBuckets":
return metrics.LISTRequest
case "GetObjectACL", "GetObjectTagging", "SelectObjectContent", "GetObjectRetention", "getobjectlegalhold",
"GetObjectAttributes", "GetObject", "GetBucketLocation", "GetBucketPolicy",
"GetBucketLifecycle", "GetBucketEncryption", "GetBucketCors", "GetBucketACL",
"GetBucketWebsite", "GetBucketAccelerate", "GetBucketRequestPayment", "GetBucketLogging",
"GetBucketReplication", "GetBucketTagging", "GetBucketObjectLockConfig",
"GetBucketVersioning", "GetBucketNotification", "ListenBucketNotification":
return metrics.GETRequest
case "AbortMultipartUpload", "DeleteObjectTagging", "DeleteObject", "DeleteBucketCors",
"DeleteBucketWebsite", "DeleteBucketTagging", "DeleteMultipleObjects", "DeleteBucketPolicy",
"DeleteBucketLifecycle", "DeleteBucketEncryption", "DeleteBucket":
return metrics.DELETERequest
default:
return metrics.UNKNOWNRequest
}
}
type (
UsersStat interface {
Update(user, bucket, cnrID string, reqType int, in, out uint64)
}
readCounter struct {
io.ReadCloser
countBytes uint64
}
writeCounter struct {
http.ResponseWriter
countBytes uint64
}
responseWrapper struct {
sync.Once
http.ResponseWriter
statusCode int
startTime time.Time
}
)
const systemPath = "/system"
//var apiStatMetrics = metrics.newApiStatMetrics()
// CIDResolveFunc is a func to resolve CID in Stats handler.
type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string)
// Stats is a handler that update metrics.
func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc, appMetrics *metrics.AppMetrics) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
reqInfo := GetReqInfo(r.Context())
appMetrics.Statistic().CurrentS3RequestsInc(reqInfo.API)
defer appMetrics.Statistic().CurrentS3RequestsDec(reqInfo.API)
in := &readCounter{ReadCloser: r.Body}
out := &writeCounter{ResponseWriter: w}
r.Body = in
statsWriter := &responseWrapper{
ResponseWriter: out,
startTime: time.Now(),
}
f(statsWriter, r)
// Time duration in secs since the call started.
// We don't need to do nanosecond precision here
// simply for the fact that it is not human-readable.
durationSecs := time.Since(statsWriter.startTime).Seconds()
user := resolveUser(r.Context())
cnrID := resolveCID(r.Context(), reqInfo)
appMetrics.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
code := statsWriter.statusCode
// A successful request has a 2xx response code
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices
if !strings.HasSuffix(r.URL.Path, systemPath) {
appMetrics.Statistic().TotalS3RequestsInc(reqInfo.API)
if !successReq && code != 0 {
appMetrics.Statistic().TotalS3ErrorsInc(reqInfo.API)
}
}
if r.Method == http.MethodGet {
// Increment the prometheus http request response histogram with appropriate label
appMetrics.Statistic().RequestDurationsUpdate(reqInfo.API, durationSecs)
}
appMetrics.Statistic().TotalInputBytesAdd(in.countBytes)
appMetrics.Statistic().TotalOutputBytesAdd(out.countBytes)
}
}
func resolveUser(ctx context.Context) string {
user := "anon"
if bd, ok := ctx.Value(BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
user = bearer.ResolveIssuer(*bd.Gate.BearerToken).String()
}
return user
}
// WriteHeader -- writes http status code.
func (w *responseWrapper) WriteHeader(code int) {
w.Do(func() {
w.statusCode = code
w.ResponseWriter.WriteHeader(code)
})
}
// Flush -- calls the underlying Flush.
func (w *responseWrapper) Flush() {
if f, ok := w.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
func (w *writeCounter) Flush() {
if f, ok := w.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
func (w *writeCounter) Write(p []byte) (int, error) {
n, err := w.ResponseWriter.Write(p)
atomic.AddUint64(&w.countBytes, uint64(n))
return n, err
}
func (r *readCounter) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
atomic.AddUint64(&r.countBytes, uint64(n))
return n, err
}

View file

@ -182,7 +182,7 @@ func (a *App) initAPI(ctx context.Context) {
func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
}
func (a *App) initResolver() {
@ -362,6 +362,7 @@ func (a *App) Wait() {
zap.String("version", version.Version),
)
a.metrics.State().SetVersion(version.Version)
a.setHealthStatus()
<-a.webDone // wait for web-server to be stopped
@ -370,7 +371,7 @@ func (a *App) Wait() {
}
func (a *App) setHealthStatus() {
a.metrics.SetHealth(metrics.HealthStatusReady)
a.metrics.State().SetHealth(metrics.HealthStatusReady)
}
// Serve runs HTTP server to handle S3 API requests.

View file

@ -4,8 +4,6 @@ import (
"net/http"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
"go.uber.org/zap"
)
@ -16,7 +14,7 @@ type AppMetrics struct {
enabled bool
}
func NewAppMetrics(logger *zap.Logger, poolStatistics *frostfs.PoolStatistic, enabled bool) *AppMetrics {
func NewAppMetrics(logger *zap.Logger, poolStatistics StatisticScraper, enabled bool) *AppMetrics {
if !enabled {
logger.Warn("metrics are disabled")
}
@ -37,12 +35,12 @@ func (m *AppMetrics) SetEnabled(enabled bool) {
m.mu.Unlock()
}
func (m *AppMetrics) SetHealth(status HealthStatus) {
func (m *AppMetrics) State() *StateMetrics {
if !m.isEnabled() {
return
return nil
}
m.gate.State.SetHealth(status)
return m.gate.State
}
func (m *AppMetrics) Shutdown() {
@ -65,10 +63,18 @@ func (m *AppMetrics) Handler() http.Handler {
return m.gate.Handler()
}
func (m *AppMetrics) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) {
func (m *AppMetrics) Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) {
if !m.isEnabled() {
return
}
m.gate.Billing.apiStat.Update(user, bucket, cnrID, reqType, in, out)
}
func (m *AppMetrics) Statistic() *APIStatMetrics {
if !m.isEnabled() {
return nil
}
return m.gate.Stats
}

View file

@ -3,12 +3,44 @@ package metrics
import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"github.com/prometheus/client_golang/prometheus"
)
const billingSubsystem = "billing"
const (
userRequestsMetric = "user_requests"
userTrafficMetric = "user_traffic"
)
type RequestType int
const (
UNKNOWNRequest RequestType = iota
HEADRequest RequestType = iota
PUTRequest RequestType = iota
LISTRequest RequestType = iota
GETRequest RequestType = iota
DELETERequest RequestType = iota
)
func (t RequestType) String() string {
switch t {
case 1:
return "HEAD"
case 2:
return "PUT"
case 3:
return "LIST"
case 4:
return "GET"
case 5:
return "DELETE"
default:
return "Unknown"
}
}
type TrafficType int
const (
@ -60,7 +92,7 @@ type (
UserMetricsInfo struct {
UserBucketInfo
Operation api.RequestType
Operation RequestType
Requests int
}
@ -76,7 +108,7 @@ type (
}
)
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) {
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) {
u.Lock()
defer u.Unlock()
@ -141,7 +173,7 @@ func (u *UsersAPIStats) DumpMetrics() UserMetrics {
if val != 0 {
result.Requests = append(result.Requests, UserMetricsInfo{
UserBucketInfo: userBktInfo,
Operation: api.RequestType(op),
Operation: RequestType(op),
Requests: val,
})
}
@ -157,15 +189,17 @@ func (u *UsersAPIStats) DumpMetrics() UserMetrics {
type billingMetrics struct {
registry *prometheus.Registry
desc *prometheus.Desc
apiStat UsersAPIStats
userRequestsDesc *prometheus.Desc
userTrafficDesc *prometheus.Desc
apiStat UsersAPIStats
}
func newBillingMetrics() *billingMetrics {
return &billingMetrics{
registry: prometheus.NewRegistry(),
desc: prometheus.NewDesc("frostfs_s3_billing", "Billing statistics exposed by FrostFS S3 Gate instance", nil, nil),
apiStat: UsersAPIStats{},
registry: prometheus.NewRegistry(),
userRequestsDesc: NewDesc(AppMetricsDesc[billingSubsystem][userRequestsMetric]),
userTrafficDesc: NewDesc(AppMetricsDesc[billingSubsystem][userTrafficMetric]),
apiStat: UsersAPIStats{},
}
}
@ -178,7 +212,8 @@ func (b *billingMetrics) unregister() {
}
func (b *billingMetrics) Describe(ch chan<- *prometheus.Desc) {
ch <- b.desc
ch <- b.userRequestsDesc
ch <- b.userTrafficDesc
}
func (b *billingMetrics) Collect(ch chan<- prometheus.Metric) {
@ -186,10 +221,7 @@ func (b *billingMetrics) Collect(ch chan<- prometheus.Metric) {
for _, value := range userMetrics.Requests {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, billingSubsystem, "user_requests"),
"",
[]string{"user", "bucket", "cid", "operation"}, nil),
b.userRequestsDesc,
prometheus.CounterValue,
float64(value.Requests),
value.User,
@ -201,10 +233,7 @@ func (b *billingMetrics) Collect(ch chan<- prometheus.Metric) {
for _, value := range userMetrics.Traffic {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, billingSubsystem, "user_traffic"),
"",
[]string{"user", "bucket", "cid", "direction"}, nil),
b.userTrafficDesc,
prometheus.CounterValue,
float64(value.Value),
value.User,

202
metrics/desc.go Normal file
View file

@ -0,0 +1,202 @@
package metrics
import (
"encoding/json"
"github.com/prometheus/client_golang/prometheus"
)
var AppMetricsDesc = map[string]map[string]Description{
poolSubsystem: {
overallErrorsMetric: Description{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: overallErrorsMetric,
Help: "Total number of errors in pool",
},
overallNodeErrorsMetric: Description{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: overallNodeErrorsMetric,
Help: "Total number of errors for connection in pool",
VariableLabels: []string{"node"},
},
overallNodeRequestsMetric: Description{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: overallNodeRequestsMetric,
Help: "Total number of requests to specific node in pool",
VariableLabels: []string{"node"},
},
currentErrorMetric: Description{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: currentErrorMetric,
Help: "Number of errors on current connections that will be reset after the threshold",
VariableLabels: []string{"node"},
},
avgRequestDurationMetric: Description{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: avgRequestDurationMetric,
Help: "Average request duration (in milliseconds) for specific method on node in pool",
VariableLabels: []string{"node", "method"},
},
},
billingSubsystem: {
userRequestsMetric: Description{
Namespace: namespace,
Subsystem: billingSubsystem,
Name: userRequestsMetric,
Help: "Accumulated user requests",
VariableLabels: []string{"user", "bucket", "cid", "operation"},
},
userTrafficMetric: Description{
Namespace: namespace,
Subsystem: billingSubsystem,
Name: userTrafficMetric,
Help: "Accumulated user traffic",
VariableLabels: []string{"user", "bucket", "cid", "direction"},
},
},
stateSubsystem: {
healthMetric: Description{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: healthMetric,
Help: "Current S3 gateway state",
},
versionInfoMetric: Description{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: versionInfoMetric,
Help: "Version of current FrostFS S3 Gate instance",
VariableLabels: []string{"version"},
},
},
statisticSubsystem: {
requestsSecondsMetric: Description{
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: requestsSecondsMetric,
Help: "Time taken by requests served by current FrostFS S3 Gate instance",
VariableLabels: []string{"api"},
},
requestsCurrentMetric: Description{
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: requestsCurrentMetric,
Help: "Total number of running s3 requests in current FrostFS S3 Gate instance",
VariableLabels: []string{"api"},
},
requestsTotalMetric: Description{
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: requestsTotalMetric,
Help: "Total number of s3 requests in current FrostFS S3 Gate instance",
VariableLabels: []string{"api"},
},
errorsTotalMetric: Description{
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: errorsTotalMetric,
Help: "Total number of s3 errors in current FrostFS S3 Gate instance",
VariableLabels: []string{"api"},
},
txBytesTotalMetric: Description{
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: txBytesTotalMetric,
Help: "Total number of bytes sent by current FrostFS S3 Gate instance",
},
rxBytesTotalMetric: Description{
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: rxBytesTotalMetric,
Help: "Total number of bytes received by current FrostFS S3 Gate instance",
},
},
}
type Description struct {
Namespace string
Subsystem string
Name string
Help string
ConstantLabels []KeyValue
VariableLabels []string
}
type KeyValue struct {
Key string `json:"key"`
Value string `json:"value"`
}
func (d *Description) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
FQName string `json:"name"`
Help string `json:"help"`
ConstantLabels []KeyValue `json:"constant_labels"`
VariableLabels []string `json:"variable_labels"`
}{
FQName: d.BuildFQName(),
Help: d.Help,
ConstantLabels: d.ConstantLabels,
VariableLabels: d.VariableLabels,
})
}
func (d *Description) BuildFQName() string {
return prometheus.BuildFQName(d.Namespace, d.Subsystem, d.Name)
}
func (d *Description) ConstLabelsMap() map[string]string {
constsLabels := make(map[string]string, len(d.ConstantLabels))
for _, kv := range d.ConstantLabels {
constsLabels[kv.Key] = kv.Value
}
return constsLabels
}
// DescribeAll returns descriptions for metrics.
func DescribeAll() []Description {
var list []Description
for _, m := range AppMetricsDesc {
for _, description := range m {
list = append(list, description)
}
}
return list
}
func NewOpts(description Description) prometheus.Opts {
return prometheus.Opts{
Namespace: description.Namespace,
Subsystem: description.Subsystem,
Name: description.Name,
Help: description.Help,
ConstLabels: description.ConstLabelsMap(),
}
}
func NewDesc(description Description) *prometheus.Desc {
return prometheus.NewDesc(
description.BuildFQName(),
description.Help,
description.VariableLabels,
description.ConstLabelsMap())
}
func NewGauge(description Description) prometheus.Gauge {
return prometheus.NewGauge(
prometheus.GaugeOpts(NewOpts(description)),
)
}
func NewGaugeVec(description Description) *prometheus.GaugeVec {
return prometheus.NewGaugeVec(
prometheus.GaugeOpts(NewOpts(description)),
description.VariableLabels,
)
}

26
metrics/desc_test.go Normal file
View file

@ -0,0 +1,26 @@
//go:build dump_metrics
package metrics
import (
"encoding/json"
"flag"
"os"
"testing"
"github.com/stretchr/testify/require"
)
var metricsPath = flag.String("out", "", "File to export s3 gateway metrics to.")
func TestDescribeAll(t *testing.T) {
flag.Parse()
require.NotEmpty(t, metricsPath, "flag 'out' must be provided to dump metrics description")
data, err := json.Marshal(DescribeAll())
require.NoError(t, err)
err = os.WriteFile(*metricsPath, data, 0644)
require.NoError(t, err)
}

View file

@ -15,9 +15,10 @@ type StatisticScraper interface {
}
type GateMetrics struct {
State stateMetrics
State *StateMetrics
Pool poolMetricsCollector
Billing *billingMetrics
Stats *APIStatMetrics
}
func NewGateMetrics(scraper StatisticScraper) *GateMetrics {
@ -30,10 +31,14 @@ func NewGateMetrics(scraper StatisticScraper) *GateMetrics {
billingMetric := newBillingMetrics()
billingMetric.register()
statsMetric := newAPIStatMetrics()
statsMetric.register()
return &GateMetrics{
State: *stateMetric,
State: stateMetric,
Pool: *poolMetric,
Billing: billingMetric,
Stats: statsMetric,
}
}
@ -41,6 +46,7 @@ func (g *GateMetrics) Unregister() {
g.State.unregister()
prometheus.Unregister(&g.Pool)
g.Billing.unregister()
g.Stats.unregister()
}
func (g *GateMetrics) Handler() http.Handler {

View file

@ -7,7 +7,17 @@ import (
const (
poolSubsystem = "pool"
)
const (
overallErrorsMetric = "overall_errors"
overallNodeErrorsMetric = "overall_node_errors"
overallNodeRequestsMetric = "overall_node_requests"
currentErrorMetric = "current_errors"
avgRequestDurationMetric = "avg_request_duration"
)
const (
methodGetBalance = "get_balance"
methodPutContainer = "put_container"
methodGetContainer = "get_container"
@ -35,71 +45,13 @@ type poolMetricsCollector struct {
}
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
overallErrors := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_errors",
Help: "Total number of errors in pool",
},
)
overallNodeErrors := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_node_errors",
Help: "Total number of errors for connection in pool",
},
[]string{
"node",
},
)
overallNodeRequests := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_node_requests",
Help: "Total number of requests to specific node in pool",
},
[]string{
"node",
},
)
currentErrors := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "current_errors",
Help: "Number of errors on current connections that will be reset after the threshold",
},
[]string{
"node",
},
)
requestsDuration := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "avg_request_duration",
Help: "Average request duration (in milliseconds) for specific method on node in pool",
},
[]string{
"node",
"method",
},
)
return &poolMetricsCollector{
poolStatScraper: scraper,
overallErrors: overallErrors,
overallNodeErrors: overallNodeErrors,
overallNodeRequests: overallNodeRequests,
currentErrors: currentErrors,
requestDuration: requestsDuration,
overallErrors: NewGauge(AppMetricsDesc[poolSubsystem][overallErrorsMetric]),
overallNodeErrors: NewGaugeVec(AppMetricsDesc[poolSubsystem][overallNodeErrorsMetric]),
overallNodeRequests: NewGaugeVec(AppMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
currentErrors: NewGaugeVec(AppMetricsDesc[poolSubsystem][currentErrorMetric]),
requestDuration: NewGaugeVec(AppMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
}
}

View file

@ -1,9 +1,16 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)
const stateSubsystem = "state"
const (
healthMetric = "health"
versionInfoMetric = "version_info"
)
// HealthStatus of the gate application.
type HealthStatus int32
@ -14,29 +21,42 @@ const (
HealthStatusShuttingDown HealthStatus = 3
)
type stateMetrics struct {
type StateMetrics struct {
healthCheck prometheus.Gauge
versionInfo *prometheus.GaugeVec
}
func newStateMetrics() *stateMetrics {
return &stateMetrics{
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: "health",
Help: "Current S3 gateway state",
}),
func newStateMetrics() *StateMetrics {
return &StateMetrics{
healthCheck: NewGauge(AppMetricsDesc[stateSubsystem][healthMetric]),
versionInfo: NewGaugeVec(AppMetricsDesc[stateSubsystem][versionInfoMetric]),
}
}
func (m stateMetrics) register() {
func (m *StateMetrics) register() {
if m == nil {
return
}
prometheus.MustRegister(m.healthCheck)
}
func (m stateMetrics) unregister() {
func (m *StateMetrics) unregister() {
if m == nil {
return
}
prometheus.Unregister(m.healthCheck)
}
func (m stateMetrics) SetHealth(s HealthStatus) {
func (m *StateMetrics) SetHealth(s HealthStatus) {
if m == nil {
return
}
m.healthCheck.Set(float64(s))
}
func (m *StateMetrics) SetVersion(ver string) {
if m == nil {
return
}
m.versionInfo.WithLabelValues(ver).Set(1)
}

233
metrics/stats.go Normal file
View file

@ -0,0 +1,233 @@
package metrics
import (
"sync"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
)
type (
// httpAPIStats holds statistics information about
// the API given in the requests.
httpAPIStats struct {
apiStats map[string]int
sync.RWMutex
}
// httpStats holds statistics information about
// HTTP requests made by all clients.
httpStats struct {
currentS3Requests httpAPIStats
totalS3Requests httpAPIStats
totalS3Errors httpAPIStats
totalInputBytes uint64
totalOutputBytes uint64
currentS3RequestsDesc *prometheus.Desc
totalS3RequestsDesc *prometheus.Desc
totalS3ErrorsDesc *prometheus.Desc
txBytesTotalDesc *prometheus.Desc
rxBytesTotalDesc *prometheus.Desc
}
APIStatMetrics struct {
stats *httpStats
httpRequestsDuration *prometheus.HistogramVec
}
)
const (
statisticSubsystem = "statistic"
)
const (
requestsSecondsMetric = "requests_seconds"
requestsCurrentMetric = "requests_current"
requestsTotalMetric = "requests_total"
errorsTotalMetric = "errors_total"
txBytesTotalMetric = "tx_bytes_total"
rxBytesTotalMetric = "rx_bytes_total"
)
func newAPIStatMetrics() *APIStatMetrics {
histogramDesc := AppMetricsDesc[statisticSubsystem][requestsSecondsMetric]
return &APIStatMetrics{
stats: newHTTPStats(),
httpRequestsDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: histogramDesc.Namespace,
Subsystem: histogramDesc.Subsystem,
Name: histogramDesc.Name,
Help: histogramDesc.Name,
ConstLabels: histogramDesc.ConstLabelsMap(),
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10},
},
histogramDesc.VariableLabels,
),
}
}
func (a *APIStatMetrics) register() {
if a == nil {
return
}
prometheus.MustRegister(a.stats)
prometheus.MustRegister(a.httpRequestsDuration)
}
func (a *APIStatMetrics) unregister() {
if a == nil {
return
}
prometheus.Unregister(a.stats)
prometheus.Unregister(a.httpRequestsDuration)
}
func (a *APIStatMetrics) CurrentS3RequestsInc(api string) {
if a == nil {
return
}
a.stats.currentS3Requests.Inc(api)
}
func (a *APIStatMetrics) CurrentS3RequestsDec(api string) {
if a == nil {
return
}
a.stats.currentS3Requests.Dec(api)
}
func (a *APIStatMetrics) TotalS3RequestsInc(api string) {
if a == nil {
return
}
a.stats.totalS3Requests.Inc(api)
}
func (a *APIStatMetrics) TotalS3ErrorsInc(api string) {
if a == nil {
return
}
a.stats.totalS3Errors.Inc(api)
}
func (a *APIStatMetrics) TotalInputBytesAdd(val uint64) {
if a == nil {
return
}
atomic.AddUint64(&a.stats.totalInputBytes, val)
}
func (a *APIStatMetrics) TotalOutputBytesAdd(val uint64) {
if a == nil {
return
}
atomic.AddUint64(&a.stats.totalOutputBytes, val)
}
func (a *APIStatMetrics) RequestDurationsUpdate(api string, durationSecs float64) {
if a == nil {
return
}
a.httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(durationSecs)
}
func (a *APIStatMetrics) Describe(ch chan<- *prometheus.Desc) {
if a == nil {
return
}
a.stats.Describe(ch)
a.httpRequestsDuration.Describe(ch)
}
func (a *APIStatMetrics) Collect(ch chan<- prometheus.Metric) {
if a == nil {
return
}
a.stats.Collect(ch)
a.httpRequestsDuration.Collect(ch)
}
func newHTTPStats() *httpStats {
return &httpStats{
currentS3RequestsDesc: NewDesc(AppMetricsDesc[statisticSubsystem][requestsCurrentMetric]),
totalS3RequestsDesc: NewDesc(AppMetricsDesc[statisticSubsystem][requestsTotalMetric]),
totalS3ErrorsDesc: NewDesc(AppMetricsDesc[statisticSubsystem][errorsTotalMetric]),
txBytesTotalDesc: NewDesc(AppMetricsDesc[statisticSubsystem][txBytesTotalMetric]),
rxBytesTotalDesc: NewDesc(AppMetricsDesc[statisticSubsystem][rxBytesTotalMetric]),
}
}
func (s *httpStats) Describe(desc chan<- *prometheus.Desc) {
desc <- s.currentS3RequestsDesc
desc <- s.totalS3RequestsDesc
desc <- s.totalS3ErrorsDesc
desc <- s.txBytesTotalDesc
desc <- s.rxBytesTotalDesc
}
func (s *httpStats) Collect(ch chan<- prometheus.Metric) {
for api, value := range s.currentS3Requests.Load() {
ch <- prometheus.MustNewConstMetric(s.currentS3RequestsDesc, prometheus.CounterValue, float64(value), api)
}
for api, value := range s.totalS3Requests.Load() {
ch <- prometheus.MustNewConstMetric(s.totalS3RequestsDesc, prometheus.CounterValue, float64(value), api)
}
for api, value := range s.totalS3Errors.Load() {
ch <- prometheus.MustNewConstMetric(s.totalS3ErrorsDesc, prometheus.CounterValue, float64(value), api)
}
// Network Sent/Received Bytes (Outbound)
ch <- prometheus.MustNewConstMetric(s.txBytesTotalDesc, prometheus.CounterValue, float64(s.getInputBytes()))
ch <- prometheus.MustNewConstMetric(s.rxBytesTotalDesc, prometheus.CounterValue, float64(s.getOutputBytes()))
}
// Inc increments the api stats counter.
func (s *httpAPIStats) Inc(api string) {
if s == nil {
return
}
s.Lock()
defer s.Unlock()
if s.apiStats == nil {
s.apiStats = make(map[string]int)
}
s.apiStats[api]++
}
// Dec increments the api stats counter.
func (s *httpAPIStats) Dec(api string) {
if s == nil {
return
}
s.Lock()
defer s.Unlock()
if val, ok := s.apiStats[api]; ok && val > 0 {
s.apiStats[api]--
}
}
// Load returns the recorded stats.
func (s *httpAPIStats) Load() map[string]int {
s.Lock()
defer s.Unlock()
var apiStats = make(map[string]int, len(s.apiStats))
for k, v := range s.apiStats {
apiStats[k] = v
}
return apiStats
}
func (s *httpStats) getInputBytes() uint64 {
return atomic.LoadUint64(&s.totalInputBytes)
}
func (s *httpStats) getOutputBytes() uint64 {
return atomic.LoadUint64(&s.totalOutputBytes)
}