[TrueCloudLab#26] Add billing metrics to separate registry

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-02-09 11:59:31 +03:00 committed by Alex Vanin
parent 9dcacc230e
commit 9f823bd65a
10 changed files with 553 additions and 461 deletions

View file

@ -14,25 +14,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
type TrafficType int
const (
UnknownTraffic TrafficType = iota
INTraffic TrafficType = iota
OUTTraffic TrafficType = iota
)
func (t TrafficType) String() string {
switch t {
case 1:
return "IN"
case 2:
return "OUT"
default:
return "Unknown"
}
}
type RequestType int type RequestType int
const ( const (
@ -90,8 +71,6 @@ func RequestTypeFromAPI(api string) RequestType {
} }
} }
type OperationList [6]int
type ( type (
// HTTPAPIStats holds statistics information about // HTTPAPIStats holds statistics information about
// the API given in the requests. // the API given in the requests.
@ -100,54 +79,13 @@ type (
sync.RWMutex sync.RWMutex
} }
UsersAPIStats struct { UsersStat interface {
users map[string]*userAPIStats Update(user, bucket, cnrID string, reqType RequestType, in, out uint64)
sync.RWMutex
}
bucketKey struct {
name string
cid string
}
bucketStat struct {
Operations OperationList
InTraffic uint64
OutTraffic uint64
}
userAPIStats struct {
buckets map[bucketKey]bucketStat
user string
}
UserBucketInfo struct {
User string
Bucket string
ContainerID string
}
UserMetricsInfo struct {
UserBucketInfo
Operation RequestType
Requests int
}
UserTrafficMetricsInfo struct {
UserBucketInfo
Type TrafficType
Value uint64
}
UserMetrics struct {
Requests []UserMetricsInfo
Traffic []UserTrafficMetricsInfo
} }
// HTTPStats holds statistics information about // HTTPStats holds statistics information about
// HTTP requests made by all clients. // HTTP requests made by all clients.
HTTPStats struct { HTTPStats struct {
usersS3Requests UsersAPIStats
currentS3Requests HTTPAPIStats currentS3Requests HTTPAPIStats
totalS3Requests HTTPAPIStats totalS3Requests HTTPAPIStats
totalS3Errors HTTPAPIStats totalS3Errors HTTPAPIStats
@ -229,45 +167,11 @@ func collectHTTPMetrics(ch chan<- prometheus.Metric) {
} }
} }
func collectUserMetrics(ch chan<- prometheus.Metric) {
userMetrics := httpStatsMetric.usersS3Requests.DumpMetrics()
for _, value := range userMetrics.Requests {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "user_requests", "count"),
"",
[]string{"user", "bucket", "cid", "operation"}, nil),
prometheus.CounterValue,
float64(value.Requests),
value.User,
value.Bucket,
value.ContainerID,
value.Operation.String(),
)
}
for _, value := range userMetrics.Traffic {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "user_traffic", "bytes"),
"",
[]string{"user", "bucket", "cid", "type"}, nil),
prometheus.CounterValue,
float64(value.Value),
value.User,
value.Bucket,
value.ContainerID,
value.Type.String(),
)
}
}
// CIDResolveFunc is a func to resolve CID in Stats handler. // CIDResolveFunc is a func to resolve CID in Stats handler.
type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string) type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string)
// Stats is a handler that update metrics. // Stats is a handler that update metrics.
func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc) http.HandlerFunc { func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc, usersStat UsersStat) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
reqInfo := GetReqInfo(r.Context()) reqInfo := GetReqInfo(r.Context())
@ -293,7 +197,7 @@ func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc) http.HandlerFunc {
user := resolveUser(r.Context()) user := resolveUser(r.Context())
cnrID := resolveCID(r.Context(), reqInfo) cnrID := resolveCID(r.Context(), reqInfo)
httpStatsMetric.usersS3Requests.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes) usersStat.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
code := statsWriter.statusCode code := statsWriter.statusCode
// A successful request has a 2xx response code // A successful request has a 2xx response code
@ -359,84 +263,6 @@ func (stats *HTTPAPIStats) Load() map[string]int {
return apiStats return apiStats
} }
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) {
u.Lock()
defer u.Unlock()
usersStat := u.users[user]
if usersStat == nil {
if u.users == nil {
u.users = make(map[string]*userAPIStats)
}
usersStat = &userAPIStats{
buckets: make(map[bucketKey]bucketStat, 1),
user: user,
}
u.users[user] = usersStat
}
key := bucketKey{
name: bucket,
cid: cnrID,
}
bktStat := usersStat.buckets[key]
bktStat.Operations[reqType]++
bktStat.InTraffic += in
bktStat.OutTraffic += out
usersStat.buckets[key] = bktStat
}
func (u *UsersAPIStats) DumpMetrics() UserMetrics {
u.Lock()
defer u.Unlock()
result := UserMetrics{
Requests: make([]UserMetricsInfo, 0, len(u.users)),
Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)),
}
for user, userStat := range u.users {
for key, bktStat := range userStat.buckets {
userBktInfo := UserBucketInfo{
User: user,
Bucket: key.name,
ContainerID: key.cid,
}
if bktStat.InTraffic != 0 {
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
UserBucketInfo: userBktInfo,
Type: INTraffic,
Value: bktStat.InTraffic,
})
}
if bktStat.OutTraffic != 0 {
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
UserBucketInfo: userBktInfo,
Type: OUTTraffic,
Value: bktStat.OutTraffic,
})
}
for op, val := range bktStat.Operations {
if val != 0 {
result.Requests = append(result.Requests, UserMetricsInfo{
UserBucketInfo: userBktInfo,
Operation: RequestType(op),
Requests: val,
})
}
}
}
}
u.users = make(map[string]*userAPIStats)
return result
}
func (st *HTTPStats) getInputBytes() uint64 { func (st *HTTPStats) getInputBytes() uint64 {
return atomic.LoadUint64(&st.totalInputBytes) return atomic.LoadUint64(&st.totalInputBytes)
} }

View file

@ -64,6 +64,5 @@ func (s *stats) Collect(ch chan<- prometheus.Metric) {
// connect collectors // connect collectors
collectHTTPMetrics(ch) collectHTTPMetrics(ch)
collectUserMetrics(ch)
collectNetworkMetrics(ch) collectNetworkMetrics(ch)
} }

View file

@ -151,9 +151,9 @@ func appendCORS(handler Handler) mux.MiddlewareFunc {
type BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error) type BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error)
// metricsMiddleware wraps http handler for api with basic statistics collection. // metricsMiddleware wraps http handler for api with basic statistics collection.
func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc) mux.MiddlewareFunc { func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc, usersStat UsersStat) mux.MiddlewareFunc {
return func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler {
return Stats(h.ServeHTTP, resolveCID(log, resolveBucket)) return Stats(h.ServeHTTP, resolveCID(log, resolveBucket), usersStat)
} }
} }
@ -223,10 +223,10 @@ func setErrorAPI(apiName string, h http.Handler) http.Handler {
} }
// attachErrorHandler set NotFoundHandler and MethodNotAllowedHandler for mux.Router. // attachErrorHandler set NotFoundHandler and MethodNotAllowedHandler for mux.Router.
func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center) { func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center, usersStat UsersStat) {
middlewares := []mux.MiddlewareFunc{ middlewares := []mux.MiddlewareFunc{
AuthMiddleware(log, center), AuthMiddleware(log, center),
metricsMiddleware(log, h.ResolveBucket), metricsMiddleware(log, h.ResolveBucket, usersStat),
} }
var errorHandler http.Handler = http.HandlerFunc(errorResponseHandler) var errorHandler http.Handler = http.HandlerFunc(errorResponseHandler)
@ -241,7 +241,7 @@ func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth
// Attach adds S3 API handlers from h to r for domains with m client limit using // Attach adds S3 API handlers from h to r for domains with m client limit using
// center authentication and log logger. // center authentication and log logger.
func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger) { func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger, usersStat UsersStat) {
api := r.PathPrefix(SlashSeparator).Subrouter() api := r.PathPrefix(SlashSeparator).Subrouter()
api.Use( api.Use(
@ -251,13 +251,13 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
// Attach user authentication for all S3 routes. // Attach user authentication for all S3 routes.
AuthMiddleware(log, center), AuthMiddleware(log, center),
metricsMiddleware(log, h.ResolveBucket), metricsMiddleware(log, h.ResolveBucket, usersStat),
// -- logging error requests // -- logging error requests
logSuccessResponse(log), logSuccessResponse(log),
) )
attachErrorHandler(api, log, h, center) attachErrorHandler(api, log, h, center, usersStat)
buckets := make([]*mux.Router, 0, len(domains)+1) buckets := make([]*mux.Router, 0, len(domains)+1)
buckets = append(buckets, api.PathPrefix("/{bucket}").Subrouter()) buckets = append(buckets, api.PathPrefix("/{bucket}").Subrouter())

View file

@ -23,6 +23,7 @@ import (
"github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs" "github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
"github.com/TrueCloudLab/frostfs-s3-gw/internal/version" "github.com/TrueCloudLab/frostfs-s3-gw/internal/version"
"github.com/TrueCloudLab/frostfs-s3-gw/internal/wallet" "github.com/TrueCloudLab/frostfs-s3-gw/internal/wallet"
"github.com/TrueCloudLab/frostfs-s3-gw/metrics"
"github.com/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/TrueCloudLab/frostfs-sdk-go/pool" "github.com/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -45,7 +46,7 @@ type (
servers []Server servers []Server
metrics *appMetrics metrics *metrics.AppMetrics
bucketResolver *resolver.BucketResolver bucketResolver *resolver.BucketResolver
services []*Service services []*Service
settings *appSettings settings *appSettings
@ -65,18 +66,6 @@ type (
lvl zap.AtomicLevel lvl zap.AtomicLevel
} }
appMetrics struct {
logger *zap.Logger
provider GateMetricsCollector
mu sync.RWMutex
enabled bool
}
GateMetricsCollector interface {
SetHealth(int32)
Unregister()
}
placementPolicy struct { placementPolicy struct {
mu sync.RWMutex mu sync.RWMutex
defaultPolicy netmap.PlacementPolicy defaultPolicy netmap.PlacementPolicy
@ -183,8 +172,7 @@ func (a *App) initAPI(ctx context.Context) {
} }
func (a *App) initMetrics() { func (a *App) initMetrics() {
gateMetricsProvider := newGateMetrics(frostfs.NewPoolStatistic(a.pool)) a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics = newAppMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
} }
func (a *App) initResolver() { func (a *App) initResolver() {
@ -344,47 +332,6 @@ func (p *placementPolicy) update(defaultPolicy string, regionPolicyFilepath stri
return nil return nil
} }
func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics {
if !enabled {
logger.Warn("metrics are disabled")
}
return &appMetrics{
logger: logger,
provider: provider,
}
}
func (m *appMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn("metrics are disabled")
}
m.mu.Lock()
m.enabled = enabled
m.mu.Unlock()
}
func (m *appMetrics) SetHealth(status int32) {
m.mu.RLock()
if !m.enabled {
m.mu.RUnlock()
return
}
m.mu.RUnlock()
m.provider.SetHealth(status)
}
func (m *appMetrics) Shutdown() {
m.mu.Lock()
if m.enabled {
m.provider.SetHealth(0)
m.enabled = false
}
m.provider.Unregister()
m.mu.Unlock()
}
func remove(list []string, element string) []string { func remove(list []string, element string) []string {
for i, item := range list { for i, item := range list {
if item == element { if item == element {
@ -422,7 +369,7 @@ func (a *App) Serve(ctx context.Context) {
domains := a.cfg.GetStringSlice(cfgListenDomains) domains := a.cfg.GetStringSlice(cfgListenDomains)
a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains)) a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains))
router := mux.NewRouter().SkipClean(true).UseEncodedPath() router := mux.NewRouter().SkipClean(true).UseEncodedPath()
api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log) api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log, a.metrics)
// Use mux.Router as http.Handler // Use mux.Router as http.Handler
srv := new(http.Server) srv := new(http.Server)
@ -519,7 +466,7 @@ func (a *App) startServices() {
a.services = append(a.services, pprofService) a.services = append(a.services, pprofService)
go pprofService.Start() go pprofService.Start()
prometheusService := NewPrometheusService(a.cfg, a.log) prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
a.services = append(a.services, prometheusService) a.services = append(a.services, prometheusService)
go prometheusService.Start() go prometheusService.Start()
} }

View file

@ -3,227 +3,12 @@ package main
import ( import (
"net/http" "net/http"
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
) )
const (
namespace = "frostfs_s3_gw"
stateSubsystem = "state"
poolSubsystem = "pool"
methodGetBalance = "get_balance"
methodPutContainer = "put_container"
methodGetContainer = "get_container"
methodListContainer = "list_container"
methodDeleteContainer = "delete_container"
methodGetContainerEacl = "get_container_eacl"
methodSetContainerEacl = "set_container_eacl"
methodEndpointInfo = "endpoint_info"
methodNetworkInfo = "network_info"
methodPutObject = "put_object"
methodDeleteObject = "delete_object"
methodGetObject = "get_object"
methodHeadObject = "head_object"
methodRangeObject = "range_object"
methodCreateSession = "create_session"
)
type StatisticScraper interface {
Statistic() pool.Statistic
}
type GateMetrics struct {
stateMetrics
poolMetricsCollector
}
type stateMetrics struct {
healthCheck prometheus.Gauge
}
type poolMetricsCollector struct {
poolStatScraper StatisticScraper
overallErrors prometheus.Gauge
overallNodeErrors *prometheus.GaugeVec
overallNodeRequests *prometheus.GaugeVec
currentErrors *prometheus.GaugeVec
requestDuration *prometheus.GaugeVec
}
func newGateMetrics(scraper StatisticScraper) *GateMetrics {
stateMetric := newStateMetrics()
stateMetric.register()
poolMetric := newPoolMetricsCollector(scraper)
poolMetric.register()
return &GateMetrics{
stateMetrics: *stateMetric,
poolMetricsCollector: *poolMetric,
}
}
func (g *GateMetrics) Unregister() {
g.stateMetrics.unregister()
prometheus.Unregister(&g.poolMetricsCollector)
}
func newStateMetrics() *stateMetrics {
return &stateMetrics{
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: "health",
Help: "Current S3 gateway state",
}),
}
}
func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck)
}
func (m stateMetrics) unregister() {
prometheus.Unregister(m.healthCheck)
}
func (m stateMetrics) SetHealth(s int32) {
m.healthCheck.Set(float64(s))
}
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
overallErrors := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_errors",
Help: "Total number of errors in pool",
},
)
overallNodeErrors := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_node_errors",
Help: "Total number of errors for connection in pool",
},
[]string{
"node",
},
)
overallNodeRequests := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_node_requests",
Help: "Total number of requests to specific node in pool",
},
[]string{
"node",
},
)
currentErrors := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "current_errors",
Help: "Number of errors on current connections that will be reset after the threshold",
},
[]string{
"node",
},
)
requestsDuration := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "avg_request_duration",
Help: "Average request duration (in milliseconds) for specific method on node in pool",
},
[]string{
"node",
"method",
},
)
return &poolMetricsCollector{
poolStatScraper: scraper,
overallErrors: overallErrors,
overallNodeErrors: overallNodeErrors,
overallNodeRequests: overallNodeRequests,
currentErrors: currentErrors,
requestDuration: requestsDuration,
}
}
func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m.updateStatistic()
m.overallErrors.Collect(ch)
m.overallNodeErrors.Collect(ch)
m.overallNodeRequests.Collect(ch)
m.currentErrors.Collect(ch)
m.requestDuration.Collect(ch)
}
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
m.overallErrors.Describe(descs)
m.overallNodeErrors.Describe(descs)
m.overallNodeRequests.Describe(descs)
m.currentErrors.Describe(descs)
m.requestDuration.Describe(descs)
}
func (m *poolMetricsCollector) register() {
prometheus.MustRegister(m)
}
func (m *poolMetricsCollector) updateStatistic() {
stat := m.poolStatScraper.Statistic()
m.overallNodeErrors.Reset()
m.overallNodeRequests.Reset()
m.currentErrors.Reset()
m.requestDuration.Reset()
for _, node := range stat.Nodes() {
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
m.overallNodeRequests.WithLabelValues(node.Address()).Set(float64(node.Requests()))
m.currentErrors.WithLabelValues(node.Address()).Set(float64(node.CurrentErrors()))
m.updateRequestsDuration(node)
}
m.overallErrors.Set(float64(stat.OverallErrors()))
}
func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) {
m.requestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.AverageGetBalance().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.AveragePutContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.AverageGetContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.AverageListContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.AverageDeleteContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.AverageGetContainerEACL().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.AverageSetContainerEACL().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.AverageEndpointInfo().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.AverageNetworkInfo().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.AveragePutObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.AverageDeleteObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.AverageGetObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds()))
}
// NewPrometheusService creates a new service for gathering prometheus metrics. // NewPrometheusService creates a new service for gathering prometheus metrics.
func NewPrometheusService(v *viper.Viper, log *zap.Logger) *Service { func NewPrometheusService(v *viper.Viper, log *zap.Logger, handler http.Handler) *Service {
if log == nil { if log == nil {
return nil return nil
} }
@ -231,7 +16,7 @@ func NewPrometheusService(v *viper.Viper, log *zap.Logger) *Service {
return &Service{ return &Service{
Server: &http.Server{ Server: &http.Server{
Addr: v.GetString(cfgPrometheusAddress), Addr: v.GetString(cfgPrometheusAddress),
Handler: promhttp.Handler(), Handler: handler,
}, },
enabled: v.GetBool(cfgPrometheusEnabled), enabled: v.GetBool(cfgPrometheusEnabled),
serviceType: "Prometheus", serviceType: "Prometheus",

74
metrics/app.go Normal file
View file

@ -0,0 +1,74 @@
package metrics
import (
"net/http"
"sync"
"github.com/TrueCloudLab/frostfs-s3-gw/api"
"github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
"go.uber.org/zap"
)
type AppMetrics struct {
logger *zap.Logger
gate *GateMetrics
mu sync.RWMutex
enabled bool
}
func NewAppMetrics(logger *zap.Logger, poolStatistics *frostfs.PoolStatistic, enabled bool) *AppMetrics {
if !enabled {
logger.Warn("metrics are disabled")
}
return &AppMetrics{
logger: logger,
gate: NewGateMetrics(poolStatistics),
enabled: enabled,
}
}
func (m *AppMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn("metrics are disabled")
}
m.mu.Lock()
m.enabled = enabled
m.mu.Unlock()
}
func (m *AppMetrics) SetHealth(status int32) {
if !m.isEnabled() {
return
}
m.gate.State.SetHealth(status)
}
func (m *AppMetrics) Shutdown() {
m.mu.Lock()
if m.enabled {
m.gate.State.SetHealth(0)
m.enabled = false
}
m.gate.Unregister()
m.mu.Unlock()
}
func (m *AppMetrics) isEnabled() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.enabled
}
func (m *AppMetrics) Handler() http.Handler {
return m.gate.Handler()
}
func (m *AppMetrics) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) {
if !m.isEnabled() {
return
}
m.gate.Billing.apiStat.Update(user, bucket, cnrID, reqType, in, out)
}

216
metrics/billing.go Normal file
View file

@ -0,0 +1,216 @@
package metrics
import (
"sync"
"github.com/TrueCloudLab/frostfs-s3-gw/api"
"github.com/prometheus/client_golang/prometheus"
)
const billingSubsystem = "billing"
type TrafficType int
const (
UnknownTraffic TrafficType = iota
INTraffic TrafficType = iota
OUTTraffic TrafficType = iota
)
func (t TrafficType) String() string {
switch t {
case 1:
return "IN"
case 2:
return "OUT"
default:
return "Unknown"
}
}
type (
OperationList [6]int
UsersAPIStats struct {
users map[string]*userAPIStats
sync.RWMutex
}
bucketKey struct {
name string
cid string
}
bucketStat struct {
Operations OperationList
InTraffic uint64
OutTraffic uint64
}
userAPIStats struct {
buckets map[bucketKey]bucketStat
user string
}
UserBucketInfo struct {
User string
Bucket string
ContainerID string
}
UserMetricsInfo struct {
UserBucketInfo
Operation api.RequestType
Requests int
}
UserTrafficMetricsInfo struct {
UserBucketInfo
Type TrafficType
Value uint64
}
UserMetrics struct {
Requests []UserMetricsInfo
Traffic []UserTrafficMetricsInfo
}
)
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) {
u.Lock()
defer u.Unlock()
usersStat := u.users[user]
if usersStat == nil {
if u.users == nil {
u.users = make(map[string]*userAPIStats)
}
usersStat = &userAPIStats{
buckets: make(map[bucketKey]bucketStat, 1),
user: user,
}
u.users[user] = usersStat
}
key := bucketKey{
name: bucket,
cid: cnrID,
}
bktStat := usersStat.buckets[key]
bktStat.Operations[reqType]++
bktStat.InTraffic += in
bktStat.OutTraffic += out
usersStat.buckets[key] = bktStat
}
func (u *UsersAPIStats) DumpMetrics() UserMetrics {
u.Lock()
defer u.Unlock()
result := UserMetrics{
Requests: make([]UserMetricsInfo, 0, len(u.users)),
Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)),
}
for user, userStat := range u.users {
for key, bktStat := range userStat.buckets {
userBktInfo := UserBucketInfo{
User: user,
Bucket: key.name,
ContainerID: key.cid,
}
if bktStat.InTraffic != 0 {
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
UserBucketInfo: userBktInfo,
Type: INTraffic,
Value: bktStat.InTraffic,
})
}
if bktStat.OutTraffic != 0 {
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
UserBucketInfo: userBktInfo,
Type: OUTTraffic,
Value: bktStat.OutTraffic,
})
}
for op, val := range bktStat.Operations {
if val != 0 {
result.Requests = append(result.Requests, UserMetricsInfo{
UserBucketInfo: userBktInfo,
Operation: api.RequestType(op),
Requests: val,
})
}
}
}
}
u.users = make(map[string]*userAPIStats)
return result
}
type billingMetrics struct {
registry *prometheus.Registry
desc *prometheus.Desc
apiStat UsersAPIStats
}
func newBillingMetrics() *billingMetrics {
return &billingMetrics{
registry: prometheus.NewRegistry(),
desc: prometheus.NewDesc("frostfs_s3_billing", "Billing statistics exposed by FrostFS S3 Gate instance", nil, nil),
apiStat: UsersAPIStats{},
}
}
func (b *billingMetrics) register() {
b.registry.MustRegister(b)
}
func (b *billingMetrics) unregister() {
b.registry.Unregister(b)
}
func (b *billingMetrics) Describe(ch chan<- *prometheus.Desc) {
ch <- b.desc
}
func (b *billingMetrics) Collect(ch chan<- prometheus.Metric) {
userMetrics := b.apiStat.DumpMetrics()
for _, value := range userMetrics.Requests {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, billingSubsystem, "user_requests"),
"",
[]string{"user", "bucket", "cid", "operation"}, nil),
prometheus.CounterValue,
float64(value.Requests),
value.User,
value.Bucket,
value.ContainerID,
value.Operation.String(),
)
}
for _, value := range userMetrics.Traffic {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, billingSubsystem, "user_traffic"),
"",
[]string{"user", "bucket", "cid", "type"}, nil),
prometheus.CounterValue,
float64(value.Value),
value.User,
value.Bucket,
value.ContainerID,
value.Type.String(),
)
}
}

51
metrics/gate.go Normal file
View file

@ -0,0 +1,51 @@
package metrics
import (
"net/http"
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const namespace = "frostfs_s3_gw"
type StatisticScraper interface {
Statistic() pool.Statistic
}
type GateMetrics struct {
State stateMetrics
Pool poolMetricsCollector
Billing *billingMetrics
}
func NewGateMetrics(scraper StatisticScraper) *GateMetrics {
stateMetric := newStateMetrics()
stateMetric.register()
poolMetric := newPoolMetricsCollector(scraper)
poolMetric.register()
billingMetric := newBillingMetrics()
billingMetric.register()
return &GateMetrics{
State: *stateMetric,
Pool: *poolMetric,
Billing: billingMetric,
}
}
func (g *GateMetrics) Unregister() {
g.State.unregister()
prometheus.Unregister(&g.Pool)
g.Billing.unregister()
}
func (g *GateMetrics) Handler() http.Handler {
handler := http.NewServeMux()
handler.Handle("/", promhttp.Handler())
handler.Handle("/metrics/billing", promhttp.HandlerFor(g.Billing.registry, promhttp.HandlerOpts{}))
return handler
}

162
metrics/pool.go Normal file
View file

@ -0,0 +1,162 @@
package metrics
import (
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/prometheus/client_golang/prometheus"
)
const (
poolSubsystem = "pool"
methodGetBalance = "get_balance"
methodPutContainer = "put_container"
methodGetContainer = "get_container"
methodListContainer = "list_container"
methodDeleteContainer = "delete_container"
methodGetContainerEacl = "get_container_eacl"
methodSetContainerEacl = "set_container_eacl"
methodEndpointInfo = "endpoint_info"
methodNetworkInfo = "network_info"
methodPutObject = "put_object"
methodDeleteObject = "delete_object"
methodGetObject = "get_object"
methodHeadObject = "head_object"
methodRangeObject = "range_object"
methodCreateSession = "create_session"
)
type poolMetricsCollector struct {
poolStatScraper StatisticScraper
overallErrors prometheus.Gauge
overallNodeErrors *prometheus.GaugeVec
overallNodeRequests *prometheus.GaugeVec
currentErrors *prometheus.GaugeVec
requestDuration *prometheus.GaugeVec
}
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
overallErrors := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_errors",
Help: "Total number of errors in pool",
},
)
overallNodeErrors := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_node_errors",
Help: "Total number of errors for connection in pool",
},
[]string{
"node",
},
)
overallNodeRequests := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "overall_node_requests",
Help: "Total number of requests to specific node in pool",
},
[]string{
"node",
},
)
currentErrors := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "current_errors",
Help: "Number of errors on current connections that will be reset after the threshold",
},
[]string{
"node",
},
)
requestsDuration := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: poolSubsystem,
Name: "avg_request_duration",
Help: "Average request duration (in milliseconds) for specific method on node in pool",
},
[]string{
"node",
"method",
},
)
return &poolMetricsCollector{
poolStatScraper: scraper,
overallErrors: overallErrors,
overallNodeErrors: overallNodeErrors,
overallNodeRequests: overallNodeRequests,
currentErrors: currentErrors,
requestDuration: requestsDuration,
}
}
func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m.updateStatistic()
m.overallErrors.Collect(ch)
m.overallNodeErrors.Collect(ch)
m.overallNodeRequests.Collect(ch)
m.currentErrors.Collect(ch)
m.requestDuration.Collect(ch)
}
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
m.overallErrors.Describe(descs)
m.overallNodeErrors.Describe(descs)
m.overallNodeRequests.Describe(descs)
m.currentErrors.Describe(descs)
m.requestDuration.Describe(descs)
}
func (m *poolMetricsCollector) register() {
prometheus.MustRegister(m)
}
func (m *poolMetricsCollector) updateStatistic() {
stat := m.poolStatScraper.Statistic()
m.overallNodeErrors.Reset()
m.overallNodeRequests.Reset()
m.currentErrors.Reset()
m.requestDuration.Reset()
for _, node := range stat.Nodes() {
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
m.overallNodeRequests.WithLabelValues(node.Address()).Set(float64(node.Requests()))
m.currentErrors.WithLabelValues(node.Address()).Set(float64(node.CurrentErrors()))
m.updateRequestsDuration(node)
}
m.overallErrors.Set(float64(stat.OverallErrors()))
}
func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) {
m.requestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.AverageGetBalance().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.AveragePutContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.AverageGetContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.AverageListContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.AverageDeleteContainer().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.AverageGetContainerEACL().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.AverageSetContainerEACL().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.AverageEndpointInfo().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.AverageNetworkInfo().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.AveragePutObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.AverageDeleteObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.AverageGetObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds()))
}

32
metrics/state.go Normal file
View file

@ -0,0 +1,32 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
const stateSubsystem = "state"
type stateMetrics struct {
healthCheck prometheus.Gauge
}
func newStateMetrics() *stateMetrics {
return &stateMetrics{
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: "health",
Help: "Current S3 gateway state",
}),
}
}
func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck)
}
func (m stateMetrics) unregister() {
prometheus.Unregister(m.healthCheck)
}
func (m stateMetrics) SetHealth(s int32) {
m.healthCheck.Set(float64(s))
}