[TrueCloudLab#26] Add billing metrics to separate registry #26
12 changed files with 555 additions and 461 deletions
|
@ -15,6 +15,7 @@ This document outlines major changes between releases.
|
||||||
- Using multiple servers require only one healthy (TrueCloudLab#12)
|
- Using multiple servers require only one healthy (TrueCloudLab#12)
|
||||||
- Update go version to go1.18 (TrueCloudLab#16)
|
- Update go version to go1.18 (TrueCloudLab#16)
|
||||||
- Return error on invalid LocationConstraint (TrueCloudLab#23)
|
- Return error on invalid LocationConstraint (TrueCloudLab#23)
|
||||||
|
- Place billing metrics to separate url path (TrueCloudLab#26)
|
||||||
|
|
||||||
## [0.26.0] - 2022-12-28
|
## [0.26.0] - 2022-12-28
|
||||||
|
|
||||||
|
|
182
api/metrics.go
182
api/metrics.go
|
@ -14,25 +14,6 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TrafficType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
UnknownTraffic TrafficType = iota
|
|
||||||
INTraffic TrafficType = iota
|
|
||||||
OUTTraffic TrafficType = iota
|
|
||||||
)
|
|
||||||
|
|
||||||
func (t TrafficType) String() string {
|
|
||||||
switch t {
|
|
||||||
case 1:
|
|
||||||
return "IN"
|
|
||||||
case 2:
|
|
||||||
return "OUT"
|
|
||||||
default:
|
|
||||||
return "Unknown"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type RequestType int
|
type RequestType int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -90,8 +71,6 @@ func RequestTypeFromAPI(api string) RequestType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type OperationList [6]int
|
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// HTTPAPIStats holds statistics information about
|
// HTTPAPIStats holds statistics information about
|
||||||
// the API given in the requests.
|
// the API given in the requests.
|
||||||
|
@ -100,54 +79,13 @@ type (
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
UsersAPIStats struct {
|
UsersStat interface {
|
||||||
users map[string]*userAPIStats
|
Update(user, bucket, cnrID string, reqType RequestType, in, out uint64)
|
||||||
sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
bucketKey struct {
|
|
||||||
name string
|
|
||||||
cid string
|
|
||||||
}
|
|
||||||
|
|
||||||
bucketStat struct {
|
|
||||||
Operations OperationList
|
|
||||||
InTraffic uint64
|
|
||||||
OutTraffic uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
userAPIStats struct {
|
|
||||||
buckets map[bucketKey]bucketStat
|
|
||||||
user string
|
|
||||||
}
|
|
||||||
|
|
||||||
UserBucketInfo struct {
|
|
||||||
User string
|
|
||||||
Bucket string
|
|
||||||
ContainerID string
|
|
||||||
}
|
|
||||||
|
|
||||||
UserMetricsInfo struct {
|
|
||||||
UserBucketInfo
|
|
||||||
Operation RequestType
|
|
||||||
Requests int
|
|
||||||
}
|
|
||||||
|
|
||||||
UserTrafficMetricsInfo struct {
|
|
||||||
UserBucketInfo
|
|
||||||
Type TrafficType
|
|
||||||
Value uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
UserMetrics struct {
|
|
||||||
Requests []UserMetricsInfo
|
|
||||||
Traffic []UserTrafficMetricsInfo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTPStats holds statistics information about
|
// HTTPStats holds statistics information about
|
||||||
// HTTP requests made by all clients.
|
// HTTP requests made by all clients.
|
||||||
HTTPStats struct {
|
HTTPStats struct {
|
||||||
usersS3Requests UsersAPIStats
|
|
||||||
currentS3Requests HTTPAPIStats
|
currentS3Requests HTTPAPIStats
|
||||||
totalS3Requests HTTPAPIStats
|
totalS3Requests HTTPAPIStats
|
||||||
totalS3Errors HTTPAPIStats
|
totalS3Errors HTTPAPIStats
|
||||||
|
@ -229,45 +167,11 @@ func collectHTTPMetrics(ch chan<- prometheus.Metric) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectUserMetrics(ch chan<- prometheus.Metric) {
|
|
||||||
userMetrics := httpStatsMetric.usersS3Requests.DumpMetrics()
|
|
||||||
|
|
||||||
for _, value := range userMetrics.Requests {
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName("frostfs_s3", "user_requests", "count"),
|
|
||||||
"",
|
|
||||||
[]string{"user", "bucket", "cid", "operation"}, nil),
|
|
||||||
prometheus.CounterValue,
|
|
||||||
float64(value.Requests),
|
|
||||||
value.User,
|
|
||||||
value.Bucket,
|
|
||||||
value.ContainerID,
|
|
||||||
value.Operation.String(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, value := range userMetrics.Traffic {
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName("frostfs_s3", "user_traffic", "bytes"),
|
|
||||||
"",
|
|
||||||
[]string{"user", "bucket", "cid", "type"}, nil),
|
|
||||||
prometheus.CounterValue,
|
|
||||||
float64(value.Value),
|
|
||||||
value.User,
|
|
||||||
value.Bucket,
|
|
||||||
value.ContainerID,
|
|
||||||
value.Type.String(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CIDResolveFunc is a func to resolve CID in Stats handler.
|
// CIDResolveFunc is a func to resolve CID in Stats handler.
|
||||||
type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string)
|
type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string)
|
||||||
|
|
||||||
// Stats is a handler that update metrics.
|
// Stats is a handler that update metrics.
|
||||||
func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc) http.HandlerFunc {
|
func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc, usersStat UsersStat) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
reqInfo := GetReqInfo(r.Context())
|
reqInfo := GetReqInfo(r.Context())
|
||||||
|
|
||||||
|
@ -293,7 +197,7 @@ func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc) http.HandlerFunc {
|
||||||
|
|
||||||
user := resolveUser(r.Context())
|
user := resolveUser(r.Context())
|
||||||
cnrID := resolveCID(r.Context(), reqInfo)
|
cnrID := resolveCID(r.Context(), reqInfo)
|
||||||
httpStatsMetric.usersS3Requests.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
|
usersStat.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
|
||||||
|
|
||||||
code := statsWriter.statusCode
|
code := statsWriter.statusCode
|
||||||
// A successful request has a 2xx response code
|
// A successful request has a 2xx response code
|
||||||
|
@ -359,84 +263,6 @@ func (stats *HTTPAPIStats) Load() map[string]int {
|
||||||
return apiStats
|
return apiStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) {
|
|
||||||
u.Lock()
|
|
||||||
defer u.Unlock()
|
|
||||||
|
|
||||||
usersStat := u.users[user]
|
|
||||||
if usersStat == nil {
|
|
||||||
if u.users == nil {
|
|
||||||
u.users = make(map[string]*userAPIStats)
|
|
||||||
}
|
|
||||||
usersStat = &userAPIStats{
|
|
||||||
buckets: make(map[bucketKey]bucketStat, 1),
|
|
||||||
user: user,
|
|
||||||
}
|
|
||||||
u.users[user] = usersStat
|
|
||||||
}
|
|
||||||
|
|
||||||
key := bucketKey{
|
|
||||||
name: bucket,
|
|
||||||
cid: cnrID,
|
|
||||||
}
|
|
||||||
|
|
||||||
bktStat := usersStat.buckets[key]
|
|
||||||
bktStat.Operations[reqType]++
|
|
||||||
bktStat.InTraffic += in
|
|
||||||
bktStat.OutTraffic += out
|
|
||||||
usersStat.buckets[key] = bktStat
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *UsersAPIStats) DumpMetrics() UserMetrics {
|
|
||||||
u.Lock()
|
|
||||||
defer u.Unlock()
|
|
||||||
|
|
||||||
result := UserMetrics{
|
|
||||||
Requests: make([]UserMetricsInfo, 0, len(u.users)),
|
|
||||||
Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for user, userStat := range u.users {
|
|
||||||
for key, bktStat := range userStat.buckets {
|
|
||||||
userBktInfo := UserBucketInfo{
|
|
||||||
User: user,
|
|
||||||
Bucket: key.name,
|
|
||||||
ContainerID: key.cid,
|
|
||||||
}
|
|
||||||
|
|
||||||
if bktStat.InTraffic != 0 {
|
|
||||||
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
|
|
||||||
UserBucketInfo: userBktInfo,
|
|
||||||
Type: INTraffic,
|
|
||||||
Value: bktStat.InTraffic,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if bktStat.OutTraffic != 0 {
|
|
||||||
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
|
|
||||||
UserBucketInfo: userBktInfo,
|
|
||||||
Type: OUTTraffic,
|
|
||||||
Value: bktStat.OutTraffic,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
for op, val := range bktStat.Operations {
|
|
||||||
if val != 0 {
|
|
||||||
result.Requests = append(result.Requests, UserMetricsInfo{
|
|
||||||
UserBucketInfo: userBktInfo,
|
|
||||||
Operation: RequestType(op),
|
|
||||||
Requests: val,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
u.users = make(map[string]*userAPIStats)
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *HTTPStats) getInputBytes() uint64 {
|
func (st *HTTPStats) getInputBytes() uint64 {
|
||||||
return atomic.LoadUint64(&st.totalInputBytes)
|
return atomic.LoadUint64(&st.totalInputBytes)
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,5 @@ func (s *stats) Collect(ch chan<- prometheus.Metric) {
|
||||||
|
|
||||||
// connect collectors
|
// connect collectors
|
||||||
collectHTTPMetrics(ch)
|
collectHTTPMetrics(ch)
|
||||||
collectUserMetrics(ch)
|
|
||||||
collectNetworkMetrics(ch)
|
collectNetworkMetrics(ch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,9 +151,9 @@ func appendCORS(handler Handler) mux.MiddlewareFunc {
|
||||||
type BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error)
|
type BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error)
|
||||||
|
|
||||||
// metricsMiddleware wraps http handler for api with basic statistics collection.
|
// metricsMiddleware wraps http handler for api with basic statistics collection.
|
||||||
func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc) mux.MiddlewareFunc {
|
func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc, usersStat UsersStat) mux.MiddlewareFunc {
|
||||||
return func(h http.Handler) http.Handler {
|
return func(h http.Handler) http.Handler {
|
||||||
return Stats(h.ServeHTTP, resolveCID(log, resolveBucket))
|
return Stats(h.ServeHTTP, resolveCID(log, resolveBucket), usersStat)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,10 +223,10 @@ func setErrorAPI(apiName string, h http.Handler) http.Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// attachErrorHandler set NotFoundHandler and MethodNotAllowedHandler for mux.Router.
|
// attachErrorHandler set NotFoundHandler and MethodNotAllowedHandler for mux.Router.
|
||||||
func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center) {
|
func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center, usersStat UsersStat) {
|
||||||
middlewares := []mux.MiddlewareFunc{
|
middlewares := []mux.MiddlewareFunc{
|
||||||
AuthMiddleware(log, center),
|
AuthMiddleware(log, center),
|
||||||
metricsMiddleware(log, h.ResolveBucket),
|
metricsMiddleware(log, h.ResolveBucket, usersStat),
|
||||||
}
|
}
|
||||||
|
|
||||||
var errorHandler http.Handler = http.HandlerFunc(errorResponseHandler)
|
var errorHandler http.Handler = http.HandlerFunc(errorResponseHandler)
|
||||||
|
@ -241,7 +241,7 @@ func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth
|
||||||
|
|
||||||
// Attach adds S3 API handlers from h to r for domains with m client limit using
|
// Attach adds S3 API handlers from h to r for domains with m client limit using
|
||||||
// center authentication and log logger.
|
// center authentication and log logger.
|
||||||
func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger) {
|
func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger, usersStat UsersStat) {
|
||||||
api := r.PathPrefix(SlashSeparator).Subrouter()
|
api := r.PathPrefix(SlashSeparator).Subrouter()
|
||||||
|
|
||||||
api.Use(
|
api.Use(
|
||||||
|
@ -251,13 +251,13 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
|
||||||
// Attach user authentication for all S3 routes.
|
// Attach user authentication for all S3 routes.
|
||||||
AuthMiddleware(log, center),
|
AuthMiddleware(log, center),
|
||||||
|
|
||||||
metricsMiddleware(log, h.ResolveBucket),
|
metricsMiddleware(log, h.ResolveBucket, usersStat),
|
||||||
|
|
||||||
// -- logging error requests
|
// -- logging error requests
|
||||||
logSuccessResponse(log),
|
logSuccessResponse(log),
|
||||||
)
|
)
|
||||||
|
|
||||||
attachErrorHandler(api, log, h, center)
|
attachErrorHandler(api, log, h, center, usersStat)
|
||||||
|
|
||||||
buckets := make([]*mux.Router, 0, len(domains)+1)
|
buckets := make([]*mux.Router, 0, len(domains)+1)
|
||||||
buckets = append(buckets, api.PathPrefix("/{bucket}").Subrouter())
|
buckets = append(buckets, api.PathPrefix("/{bucket}").Subrouter())
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
|
"github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
|
||||||
"github.com/TrueCloudLab/frostfs-s3-gw/internal/version"
|
"github.com/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||||
"github.com/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
"github.com/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
||||||
|
"github.com/TrueCloudLab/frostfs-s3-gw/metrics"
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
"github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
|
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
@ -45,7 +46,7 @@ type (
|
||||||
|
|
||||||
servers []Server
|
servers []Server
|
||||||
|
|
||||||
metrics *appMetrics
|
metrics *metrics.AppMetrics
|
||||||
bucketResolver *resolver.BucketResolver
|
bucketResolver *resolver.BucketResolver
|
||||||
services []*Service
|
services []*Service
|
||||||
settings *appSettings
|
settings *appSettings
|
||||||
|
@ -65,18 +66,6 @@ type (
|
||||||
lvl zap.AtomicLevel
|
lvl zap.AtomicLevel
|
||||||
}
|
}
|
||||||
|
|
||||||
appMetrics struct {
|
|
||||||
logger *zap.Logger
|
|
||||||
provider GateMetricsCollector
|
|
||||||
mu sync.RWMutex
|
|
||||||
enabled bool
|
|
||||||
}
|
|
||||||
|
|
||||||
GateMetricsCollector interface {
|
|
||||||
SetHealth(int32)
|
|
||||||
Unregister()
|
|
||||||
}
|
|
||||||
|
|
||||||
placementPolicy struct {
|
placementPolicy struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
defaultPolicy netmap.PlacementPolicy
|
defaultPolicy netmap.PlacementPolicy
|
||||||
|
@ -183,8 +172,7 @@ func (a *App) initAPI(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initMetrics() {
|
func (a *App) initMetrics() {
|
||||||
gateMetricsProvider := newGateMetrics(frostfs.NewPoolStatistic(a.pool))
|
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
|
||||||
a.metrics = newAppMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initResolver() {
|
func (a *App) initResolver() {
|
||||||
|
@ -344,47 +332,6 @@ func (p *placementPolicy) update(defaultPolicy string, regionPolicyFilepath stri
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics {
|
|
||||||
if !enabled {
|
|
||||||
logger.Warn("metrics are disabled")
|
|
||||||
}
|
|
||||||
return &appMetrics{
|
|
||||||
logger: logger,
|
|
||||||
provider: provider,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *appMetrics) SetEnabled(enabled bool) {
|
|
||||||
if !enabled {
|
|
||||||
m.logger.Warn("metrics are disabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
m.mu.Lock()
|
|
||||||
m.enabled = enabled
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *appMetrics) SetHealth(status int32) {
|
|
||||||
m.mu.RLock()
|
|
||||||
if !m.enabled {
|
|
||||||
m.mu.RUnlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
m.provider.SetHealth(status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *appMetrics) Shutdown() {
|
|
||||||
m.mu.Lock()
|
|
||||||
if m.enabled {
|
|
||||||
m.provider.SetHealth(0)
|
|
||||||
m.enabled = false
|
|
||||||
}
|
|
||||||
m.provider.Unregister()
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func remove(list []string, element string) []string {
|
func remove(list []string, element string) []string {
|
||||||
for i, item := range list {
|
for i, item := range list {
|
||||||
if item == element {
|
if item == element {
|
||||||
|
@ -422,7 +369,7 @@ func (a *App) Serve(ctx context.Context) {
|
||||||
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
||||||
a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains))
|
a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains))
|
||||||
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||||
api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log)
|
api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log, a.metrics)
|
||||||
|
|
||||||
// Use mux.Router as http.Handler
|
// Use mux.Router as http.Handler
|
||||||
srv := new(http.Server)
|
srv := new(http.Server)
|
||||||
|
@ -519,7 +466,7 @@ func (a *App) startServices() {
|
||||||
a.services = append(a.services, pprofService)
|
a.services = append(a.services, pprofService)
|
||||||
go pprofService.Start()
|
go pprofService.Start()
|
||||||
|
|
||||||
prometheusService := NewPrometheusService(a.cfg, a.log)
|
prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
|
||||||
a.services = append(a.services, prometheusService)
|
a.services = append(a.services, prometheusService)
|
||||||
go prometheusService.Start()
|
go prometheusService.Start()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,227 +3,12 @@ package main
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
namespace = "frostfs_s3_gw"
|
|
||||||
stateSubsystem = "state"
|
|
||||||
poolSubsystem = "pool"
|
|
||||||
|
|
||||||
methodGetBalance = "get_balance"
|
|
||||||
methodPutContainer = "put_container"
|
|
||||||
methodGetContainer = "get_container"
|
|
||||||
methodListContainer = "list_container"
|
|
||||||
methodDeleteContainer = "delete_container"
|
|
||||||
methodGetContainerEacl = "get_container_eacl"
|
|
||||||
methodSetContainerEacl = "set_container_eacl"
|
|
||||||
methodEndpointInfo = "endpoint_info"
|
|
||||||
methodNetworkInfo = "network_info"
|
|
||||||
methodPutObject = "put_object"
|
|
||||||
methodDeleteObject = "delete_object"
|
|
||||||
methodGetObject = "get_object"
|
|
||||||
methodHeadObject = "head_object"
|
|
||||||
methodRangeObject = "range_object"
|
|
||||||
methodCreateSession = "create_session"
|
|
||||||
)
|
|
||||||
|
|
||||||
type StatisticScraper interface {
|
|
||||||
Statistic() pool.Statistic
|
|
||||||
}
|
|
||||||
|
|
||||||
type GateMetrics struct {
|
|
||||||
stateMetrics
|
|
||||||
poolMetricsCollector
|
|
||||||
}
|
|
||||||
|
|
||||||
type stateMetrics struct {
|
|
||||||
healthCheck prometheus.Gauge
|
|
||||||
}
|
|
||||||
|
|
||||||
type poolMetricsCollector struct {
|
|
||||||
poolStatScraper StatisticScraper
|
|
||||||
overallErrors prometheus.Gauge
|
|
||||||
overallNodeErrors *prometheus.GaugeVec
|
|
||||||
overallNodeRequests *prometheus.GaugeVec
|
|
||||||
currentErrors *prometheus.GaugeVec
|
|
||||||
requestDuration *prometheus.GaugeVec
|
|
||||||
}
|
|
||||||
|
|
||||||
func newGateMetrics(scraper StatisticScraper) *GateMetrics {
|
|
||||||
stateMetric := newStateMetrics()
|
|
||||||
stateMetric.register()
|
|
||||||
|
|
||||||
poolMetric := newPoolMetricsCollector(scraper)
|
|
||||||
poolMetric.register()
|
|
||||||
|
|
||||||
return &GateMetrics{
|
|
||||||
stateMetrics: *stateMetric,
|
|
||||||
poolMetricsCollector: *poolMetric,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GateMetrics) Unregister() {
|
|
||||||
g.stateMetrics.unregister()
|
|
||||||
prometheus.Unregister(&g.poolMetricsCollector)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStateMetrics() *stateMetrics {
|
|
||||||
return &stateMetrics{
|
|
||||||
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: stateSubsystem,
|
|
||||||
Name: "health",
|
|
||||||
Help: "Current S3 gateway state",
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m stateMetrics) register() {
|
|
||||||
prometheus.MustRegister(m.healthCheck)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m stateMetrics) unregister() {
|
|
||||||
prometheus.Unregister(m.healthCheck)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m stateMetrics) SetHealth(s int32) {
|
|
||||||
m.healthCheck.Set(float64(s))
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
|
|
||||||
overallErrors := prometheus.NewGauge(
|
|
||||||
prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: poolSubsystem,
|
|
||||||
Name: "overall_errors",
|
|
||||||
Help: "Total number of errors in pool",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
overallNodeErrors := prometheus.NewGaugeVec(
|
|
||||||
prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: poolSubsystem,
|
|
||||||
Name: "overall_node_errors",
|
|
||||||
Help: "Total number of errors for connection in pool",
|
|
||||||
},
|
|
||||||
[]string{
|
|
||||||
"node",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
overallNodeRequests := prometheus.NewGaugeVec(
|
|
||||||
prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: poolSubsystem,
|
|
||||||
Name: "overall_node_requests",
|
|
||||||
Help: "Total number of requests to specific node in pool",
|
|
||||||
},
|
|
||||||
[]string{
|
|
||||||
"node",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
currentErrors := prometheus.NewGaugeVec(
|
|
||||||
prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: poolSubsystem,
|
|
||||||
Name: "current_errors",
|
|
||||||
Help: "Number of errors on current connections that will be reset after the threshold",
|
|
||||||
},
|
|
||||||
[]string{
|
|
||||||
"node",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
requestsDuration := prometheus.NewGaugeVec(
|
|
||||||
prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: poolSubsystem,
|
|
||||||
Name: "avg_request_duration",
|
|
||||||
Help: "Average request duration (in milliseconds) for specific method on node in pool",
|
|
||||||
},
|
|
||||||
[]string{
|
|
||||||
"node",
|
|
||||||
"method",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
return &poolMetricsCollector{
|
|
||||||
poolStatScraper: scraper,
|
|
||||||
overallErrors: overallErrors,
|
|
||||||
overallNodeErrors: overallNodeErrors,
|
|
||||||
overallNodeRequests: overallNodeRequests,
|
|
||||||
currentErrors: currentErrors,
|
|
||||||
requestDuration: requestsDuration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
|
||||||
m.updateStatistic()
|
|
||||||
m.overallErrors.Collect(ch)
|
|
||||||
m.overallNodeErrors.Collect(ch)
|
|
||||||
m.overallNodeRequests.Collect(ch)
|
|
||||||
m.currentErrors.Collect(ch)
|
|
||||||
m.requestDuration.Collect(ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
|
||||||
m.overallErrors.Describe(descs)
|
|
||||||
m.overallNodeErrors.Describe(descs)
|
|
||||||
m.overallNodeRequests.Describe(descs)
|
|
||||||
m.currentErrors.Describe(descs)
|
|
||||||
m.requestDuration.Describe(descs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *poolMetricsCollector) register() {
|
|
||||||
prometheus.MustRegister(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *poolMetricsCollector) updateStatistic() {
|
|
||||||
stat := m.poolStatScraper.Statistic()
|
|
||||||
|
|
||||||
m.overallNodeErrors.Reset()
|
|
||||||
m.overallNodeRequests.Reset()
|
|
||||||
m.currentErrors.Reset()
|
|
||||||
m.requestDuration.Reset()
|
|
||||||
|
|
||||||
for _, node := range stat.Nodes() {
|
|
||||||
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
|
|
||||||
m.overallNodeRequests.WithLabelValues(node.Address()).Set(float64(node.Requests()))
|
|
||||||
|
|
||||||
m.currentErrors.WithLabelValues(node.Address()).Set(float64(node.CurrentErrors()))
|
|
||||||
m.updateRequestsDuration(node)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.overallErrors.Set(float64(stat.OverallErrors()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) {
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.AverageGetBalance().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.AveragePutContainer().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.AverageGetContainer().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.AverageListContainer().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.AverageDeleteContainer().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.AverageGetContainerEACL().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.AverageSetContainerEACL().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.AverageEndpointInfo().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.AverageNetworkInfo().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.AveragePutObject().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.AverageDeleteObject().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.AverageGetObject().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds()))
|
|
||||||
m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPrometheusService creates a new service for gathering prometheus metrics.
|
// NewPrometheusService creates a new service for gathering prometheus metrics.
|
||||||
func NewPrometheusService(v *viper.Viper, log *zap.Logger) *Service {
|
func NewPrometheusService(v *viper.Viper, log *zap.Logger, handler http.Handler) *Service {
|
||||||
if log == nil {
|
if log == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -231,7 +16,7 @@ func NewPrometheusService(v *viper.Viper, log *zap.Logger) *Service {
|
||||||
return &Service{
|
return &Service{
|
||||||
Server: &http.Server{
|
Server: &http.Server{
|
||||||
Addr: v.GetString(cfgPrometheusAddress),
|
Addr: v.GetString(cfgPrometheusAddress),
|
||||||
Handler: promhttp.Handler(),
|
Handler: handler,
|
||||||
},
|
},
|
||||||
enabled: v.GetBool(cfgPrometheusEnabled),
|
enabled: v.GetBool(cfgPrometheusEnabled),
|
||||||
serviceType: "Prometheus",
|
serviceType: "Prometheus",
|
||||||
|
|
|
@ -452,6 +452,7 @@ pprof:
|
||||||
# `prometheus` section
|
# `prometheus` section
|
||||||
|
|
||||||
Contains configuration for the `prometheus` metrics service.
|
Contains configuration for the `prometheus` metrics service.
|
||||||
|
General metrics are available on `/metrics` url path, billing metrics on `/metrics/billing`.
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
prometheus:
|
prometheus:
|
||||||
|
|
74
metrics/app.go
Normal file
74
metrics/app.go
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
"github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AppMetrics struct {
|
||||||
|
logger *zap.Logger
|
||||||
|
gate *GateMetrics
|
||||||
|
mu sync.RWMutex
|
||||||
|
enabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAppMetrics(logger *zap.Logger, poolStatistics *frostfs.PoolStatistic, enabled bool) *AppMetrics {
|
||||||
|
if !enabled {
|
||||||
|
logger.Warn("metrics are disabled")
|
||||||
|
}
|
||||||
|
return &AppMetrics{
|
||||||
|
logger: logger,
|
||||||
|
gate: NewGateMetrics(poolStatistics),
|
||||||
|
enabled: enabled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *AppMetrics) SetEnabled(enabled bool) {
|
||||||
|
if !enabled {
|
||||||
|
m.logger.Warn("metrics are disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
m.enabled = enabled
|
||||||
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *AppMetrics) SetHealth(status int32) {
|
||||||
|
if !m.isEnabled() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.gate.State.SetHealth(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *AppMetrics) Shutdown() {
|
||||||
|
m.mu.Lock()
|
||||||
|
if m.enabled {
|
||||||
|
m.gate.State.SetHealth(0)
|
||||||
|
m.enabled = false
|
||||||
|
}
|
||||||
|
m.gate.Unregister()
|
||||||
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *AppMetrics) isEnabled() bool {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return m.enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *AppMetrics) Handler() http.Handler {
|
||||||
|
return m.gate.Handler()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *AppMetrics) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) {
|
||||||
|
if !m.isEnabled() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.gate.Billing.apiStat.Update(user, bucket, cnrID, reqType, in, out)
|
||||||
|
}
|
216
metrics/billing.go
Normal file
216
metrics/billing.go
Normal file
|
@ -0,0 +1,216 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const billingSubsystem = "billing"
|
||||||
|
|
||||||
|
type TrafficType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
UnknownTraffic TrafficType = iota
|
||||||
|
INTraffic TrafficType = iota
|
||||||
|
OUTTraffic TrafficType = iota
|
||||||
|
)
|
||||||
|
|
||||||
|
func (t TrafficType) String() string {
|
||||||
|
switch t {
|
||||||
|
case 1:
|
||||||
|
return "IN"
|
||||||
|
case 2:
|
||||||
|
return "OUT"
|
||||||
|
default:
|
||||||
|
return "Unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
OperationList [6]int
|
||||||
|
|
||||||
|
UsersAPIStats struct {
|
||||||
|
users map[string]*userAPIStats
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketKey struct {
|
||||||
|
name string
|
||||||
|
cid string
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketStat struct {
|
||||||
|
Operations OperationList
|
||||||
|
InTraffic uint64
|
||||||
|
OutTraffic uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
userAPIStats struct {
|
||||||
|
buckets map[bucketKey]bucketStat
|
||||||
|
user string
|
||||||
|
}
|
||||||
|
|
||||||
|
UserBucketInfo struct {
|
||||||
|
User string
|
||||||
|
Bucket string
|
||||||
|
ContainerID string
|
||||||
|
}
|
||||||
|
|
||||||
|
UserMetricsInfo struct {
|
||||||
|
UserBucketInfo
|
||||||
|
Operation api.RequestType
|
||||||
|
Requests int
|
||||||
|
}
|
||||||
|
|
||||||
|
UserTrafficMetricsInfo struct {
|
||||||
|
UserBucketInfo
|
||||||
|
Type TrafficType
|
||||||
|
Value uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
UserMetrics struct {
|
||||||
|
Requests []UserMetricsInfo
|
||||||
|
Traffic []UserTrafficMetricsInfo
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) {
|
||||||
|
u.Lock()
|
||||||
|
defer u.Unlock()
|
||||||
|
|
||||||
|
usersStat := u.users[user]
|
||||||
|
if usersStat == nil {
|
||||||
|
if u.users == nil {
|
||||||
|
u.users = make(map[string]*userAPIStats)
|
||||||
|
}
|
||||||
|
usersStat = &userAPIStats{
|
||||||
|
buckets: make(map[bucketKey]bucketStat, 1),
|
||||||
|
user: user,
|
||||||
|
}
|
||||||
|
u.users[user] = usersStat
|
||||||
|
}
|
||||||
|
|
||||||
|
key := bucketKey{
|
||||||
|
name: bucket,
|
||||||
|
cid: cnrID,
|
||||||
|
}
|
||||||
|
|
||||||
|
bktStat := usersStat.buckets[key]
|
||||||
|
bktStat.Operations[reqType]++
|
||||||
|
bktStat.InTraffic += in
|
||||||
|
bktStat.OutTraffic += out
|
||||||
|
usersStat.buckets[key] = bktStat
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UsersAPIStats) DumpMetrics() UserMetrics {
|
||||||
|
u.Lock()
|
||||||
|
defer u.Unlock()
|
||||||
|
|
||||||
|
result := UserMetrics{
|
||||||
|
Requests: make([]UserMetricsInfo, 0, len(u.users)),
|
||||||
|
Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for user, userStat := range u.users {
|
||||||
|
for key, bktStat := range userStat.buckets {
|
||||||
|
userBktInfo := UserBucketInfo{
|
||||||
|
User: user,
|
||||||
|
Bucket: key.name,
|
||||||
|
ContainerID: key.cid,
|
||||||
|
}
|
||||||
|
|
||||||
|
if bktStat.InTraffic != 0 {
|
||||||
|
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
|
||||||
|
UserBucketInfo: userBktInfo,
|
||||||
|
Type: INTraffic,
|
||||||
|
Value: bktStat.InTraffic,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if bktStat.OutTraffic != 0 {
|
||||||
|
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
|
||||||
|
UserBucketInfo: userBktInfo,
|
||||||
|
Type: OUTTraffic,
|
||||||
|
Value: bktStat.OutTraffic,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for op, val := range bktStat.Operations {
|
||||||
|
if val != 0 {
|
||||||
|
result.Requests = append(result.Requests, UserMetricsInfo{
|
||||||
|
UserBucketInfo: userBktInfo,
|
||||||
|
Operation: api.RequestType(op),
|
||||||
|
Requests: val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u.users = make(map[string]*userAPIStats)
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type billingMetrics struct {
|
||||||
|
registry *prometheus.Registry
|
||||||
|
|
||||||
|
desc *prometheus.Desc
|
||||||
|
apiStat UsersAPIStats
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBillingMetrics() *billingMetrics {
|
||||||
|
return &billingMetrics{
|
||||||
|
registry: prometheus.NewRegistry(),
|
||||||
|
desc: prometheus.NewDesc("frostfs_s3_billing", "Billing statistics exposed by FrostFS S3 Gate instance", nil, nil),
|
||||||
|
apiStat: UsersAPIStats{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *billingMetrics) register() {
|
||||||
|
b.registry.MustRegister(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *billingMetrics) unregister() {
|
||||||
|
b.registry.Unregister(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *billingMetrics) Describe(ch chan<- *prometheus.Desc) {
|
||||||
|
ch <- b.desc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *billingMetrics) Collect(ch chan<- prometheus.Metric) {
|
||||||
|
userMetrics := b.apiStat.DumpMetrics()
|
||||||
|
|
||||||
|
for _, value := range userMetrics.Requests {
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
prometheus.NewDesc(
|
||||||
|
prometheus.BuildFQName(namespace, billingSubsystem, "user_requests"),
|
||||||
|
"",
|
||||||
|
[]string{"user", "bucket", "cid", "operation"}, nil),
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(value.Requests),
|
||||||
|
value.User,
|
||||||
|
value.Bucket,
|
||||||
|
value.ContainerID,
|
||||||
|
value.Operation.String(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, value := range userMetrics.Traffic {
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
prometheus.NewDesc(
|
||||||
|
prometheus.BuildFQName(namespace, billingSubsystem, "user_traffic"),
|
||||||
|
"",
|
||||||
|
[]string{"user", "bucket", "cid", "type"}, nil),
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(value.Value),
|
||||||
|
value.User,
|
||||||
|
value.Bucket,
|
||||||
|
value.ContainerID,
|
||||||
|
value.Type.String(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
51
metrics/gate.go
Normal file
51
metrics/gate.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
const namespace = "frostfs_s3_gw"
|
||||||
|
|
||||||
|
type StatisticScraper interface {
|
||||||
|
Statistic() pool.Statistic
|
||||||
|
}
|
||||||
|
|
||||||
|
type GateMetrics struct {
|
||||||
|
State stateMetrics
|
||||||
|
Pool poolMetricsCollector
|
||||||
|
Billing *billingMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGateMetrics(scraper StatisticScraper) *GateMetrics {
|
||||||
|
stateMetric := newStateMetrics()
|
||||||
|
stateMetric.register()
|
||||||
|
|
||||||
|
poolMetric := newPoolMetricsCollector(scraper)
|
||||||
|
poolMetric.register()
|
||||||
|
|
||||||
|
billingMetric := newBillingMetrics()
|
||||||
|
billingMetric.register()
|
||||||
|
|
||||||
|
return &GateMetrics{
|
||||||
|
State: *stateMetric,
|
||||||
|
Pool: *poolMetric,
|
||||||
|
Billing: billingMetric,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GateMetrics) Unregister() {
|
||||||
|
g.State.unregister()
|
||||||
|
prometheus.Unregister(&g.Pool)
|
||||||
|
g.Billing.unregister()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GateMetrics) Handler() http.Handler {
|
||||||
|
handler := http.NewServeMux()
|
||||||
|
handler.Handle("/", promhttp.Handler())
|
||||||
|
handler.Handle("/metrics/billing", promhttp.HandlerFor(g.Billing.registry, promhttp.HandlerOpts{}))
|
||||||
|
return handler
|
||||||
|
}
|
162
metrics/pool.go
Normal file
162
metrics/pool.go
Normal file
|
@ -0,0 +1,162 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
poolSubsystem = "pool"
|
||||||
|
|
||||||
|
methodGetBalance = "get_balance"
|
||||||
|
methodPutContainer = "put_container"
|
||||||
|
methodGetContainer = "get_container"
|
||||||
|
methodListContainer = "list_container"
|
||||||
|
methodDeleteContainer = "delete_container"
|
||||||
|
methodGetContainerEacl = "get_container_eacl"
|
||||||
|
methodSetContainerEacl = "set_container_eacl"
|
||||||
|
methodEndpointInfo = "endpoint_info"
|
||||||
|
methodNetworkInfo = "network_info"
|
||||||
|
methodPutObject = "put_object"
|
||||||
|
methodDeleteObject = "delete_object"
|
||||||
|
methodGetObject = "get_object"
|
||||||
|
methodHeadObject = "head_object"
|
||||||
|
methodRangeObject = "range_object"
|
||||||
|
methodCreateSession = "create_session"
|
||||||
|
)
|
||||||
|
|
||||||
|
type poolMetricsCollector struct {
|
||||||
|
poolStatScraper StatisticScraper
|
||||||
|
overallErrors prometheus.Gauge
|
||||||
|
overallNodeErrors *prometheus.GaugeVec
|
||||||
|
overallNodeRequests *prometheus.GaugeVec
|
||||||
|
currentErrors *prometheus.GaugeVec
|
||||||
|
requestDuration *prometheus.GaugeVec
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
|
||||||
|
overallErrors := prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: poolSubsystem,
|
||||||
|
Name: "overall_errors",
|
||||||
|
Help: "Total number of errors in pool",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
overallNodeErrors := prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: poolSubsystem,
|
||||||
|
Name: "overall_node_errors",
|
||||||
|
Help: "Total number of errors for connection in pool",
|
||||||
|
},
|
||||||
|
[]string{
|
||||||
|
"node",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
overallNodeRequests := prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: poolSubsystem,
|
||||||
|
Name: "overall_node_requests",
|
||||||
|
Help: "Total number of requests to specific node in pool",
|
||||||
|
},
|
||||||
|
[]string{
|
||||||
|
"node",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
currentErrors := prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: poolSubsystem,
|
||||||
|
Name: "current_errors",
|
||||||
|
Help: "Number of errors on current connections that will be reset after the threshold",
|
||||||
|
},
|
||||||
|
[]string{
|
||||||
|
"node",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
requestsDuration := prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: poolSubsystem,
|
||||||
|
Name: "avg_request_duration",
|
||||||
|
Help: "Average request duration (in milliseconds) for specific method on node in pool",
|
||||||
|
},
|
||||||
|
[]string{
|
||||||
|
"node",
|
||||||
|
"method",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return &poolMetricsCollector{
|
||||||
|
poolStatScraper: scraper,
|
||||||
|
overallErrors: overallErrors,
|
||||||
|
overallNodeErrors: overallNodeErrors,
|
||||||
|
overallNodeRequests: overallNodeRequests,
|
||||||
|
currentErrors: currentErrors,
|
||||||
|
requestDuration: requestsDuration,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
||||||
|
m.updateStatistic()
|
||||||
|
m.overallErrors.Collect(ch)
|
||||||
|
m.overallNodeErrors.Collect(ch)
|
||||||
|
m.overallNodeRequests.Collect(ch)
|
||||||
|
m.currentErrors.Collect(ch)
|
||||||
|
m.requestDuration.Collect(ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
||||||
|
m.overallErrors.Describe(descs)
|
||||||
|
m.overallNodeErrors.Describe(descs)
|
||||||
|
m.overallNodeRequests.Describe(descs)
|
||||||
|
m.currentErrors.Describe(descs)
|
||||||
|
m.requestDuration.Describe(descs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *poolMetricsCollector) register() {
|
||||||
|
prometheus.MustRegister(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *poolMetricsCollector) updateStatistic() {
|
||||||
|
stat := m.poolStatScraper.Statistic()
|
||||||
|
|
||||||
|
m.overallNodeErrors.Reset()
|
||||||
|
m.overallNodeRequests.Reset()
|
||||||
|
m.currentErrors.Reset()
|
||||||
|
m.requestDuration.Reset()
|
||||||
|
|
||||||
|
for _, node := range stat.Nodes() {
|
||||||
|
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
|
||||||
|
m.overallNodeRequests.WithLabelValues(node.Address()).Set(float64(node.Requests()))
|
||||||
|
|
||||||
|
m.currentErrors.WithLabelValues(node.Address()).Set(float64(node.CurrentErrors()))
|
||||||
|
m.updateRequestsDuration(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.overallErrors.Set(float64(stat.OverallErrors()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) {
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.AverageGetBalance().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.AveragePutContainer().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.AverageGetContainer().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.AverageListContainer().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.AverageDeleteContainer().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.AverageGetContainerEACL().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.AverageSetContainerEACL().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.AverageEndpointInfo().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.AverageNetworkInfo().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.AveragePutObject().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.AverageDeleteObject().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.AverageGetObject().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds()))
|
||||||
|
m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds()))
|
||||||
|
}
|
32
metrics/state.go
Normal file
32
metrics/state.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
|
const stateSubsystem = "state"
|
||||||
|
|
||||||
|
type stateMetrics struct {
|
||||||
|
healthCheck prometheus.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStateMetrics() *stateMetrics {
|
||||||
|
return &stateMetrics{
|
||||||
|
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: stateSubsystem,
|
||||||
|
Name: "health",
|
||||||
|
Help: "Current S3 gateway state",
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m stateMetrics) register() {
|
||||||
|
prometheus.MustRegister(m.healthCheck)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m stateMetrics) unregister() {
|
||||||
|
prometheus.Unregister(m.healthCheck)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m stateMetrics) SetHealth(s int32) {
|
||||||
|
m.healthCheck.Set(float64(s))
|
||||||
|
}
|
Loading…
Reference in a new issue