diff --git a/api/metrics.go b/api/metrics.go index eee2fe0..24cf124 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -14,25 +14,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -type TrafficType int - -const ( - UnknownTraffic TrafficType = iota - INTraffic TrafficType = iota - OUTTraffic TrafficType = iota -) - -func (t TrafficType) String() string { - switch t { - case 1: - return "IN" - case 2: - return "OUT" - default: - return "Unknown" - } -} - type RequestType int const ( @@ -90,8 +71,6 @@ func RequestTypeFromAPI(api string) RequestType { } } -type OperationList [6]int - type ( // HTTPAPIStats holds statistics information about // the API given in the requests. @@ -100,54 +79,13 @@ type ( sync.RWMutex } - UsersAPIStats struct { - users map[string]*userAPIStats - sync.RWMutex - } - - bucketKey struct { - name string - cid string - } - - bucketStat struct { - Operations OperationList - InTraffic uint64 - OutTraffic uint64 - } - - userAPIStats struct { - buckets map[bucketKey]bucketStat - user string - } - - UserBucketInfo struct { - User string - Bucket string - ContainerID string - } - - UserMetricsInfo struct { - UserBucketInfo - Operation RequestType - Requests int - } - - UserTrafficMetricsInfo struct { - UserBucketInfo - Type TrafficType - Value uint64 - } - - UserMetrics struct { - Requests []UserMetricsInfo - Traffic []UserTrafficMetricsInfo + UsersStat interface { + Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) } // HTTPStats holds statistics information about // HTTP requests made by all clients. HTTPStats struct { - usersS3Requests UsersAPIStats currentS3Requests HTTPAPIStats totalS3Requests HTTPAPIStats totalS3Errors HTTPAPIStats @@ -229,45 +167,11 @@ func collectHTTPMetrics(ch chan<- prometheus.Metric) { } } -func collectUserMetrics(ch chan<- prometheus.Metric) { - userMetrics := httpStatsMetric.usersS3Requests.DumpMetrics() - - for _, value := range userMetrics.Requests { - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc( - prometheus.BuildFQName("frostfs_s3", "user_requests", "count"), - "", - []string{"user", "bucket", "cid", "operation"}, nil), - prometheus.CounterValue, - float64(value.Requests), - value.User, - value.Bucket, - value.ContainerID, - value.Operation.String(), - ) - } - - for _, value := range userMetrics.Traffic { - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc( - prometheus.BuildFQName("frostfs_s3", "user_traffic", "bytes"), - "", - []string{"user", "bucket", "cid", "type"}, nil), - prometheus.CounterValue, - float64(value.Value), - value.User, - value.Bucket, - value.ContainerID, - value.Type.String(), - ) - } -} - // CIDResolveFunc is a func to resolve CID in Stats handler. type CIDResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string) // Stats is a handler that update metrics. -func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc) http.HandlerFunc { +func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc, usersStat UsersStat) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { reqInfo := GetReqInfo(r.Context()) @@ -293,7 +197,7 @@ func Stats(f http.HandlerFunc, resolveCID CIDResolveFunc) http.HandlerFunc { user := resolveUser(r.Context()) cnrID := resolveCID(r.Context(), reqInfo) - httpStatsMetric.usersS3Requests.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes) + usersStat.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes) code := statsWriter.statusCode // A successful request has a 2xx response code @@ -359,84 +263,6 @@ func (stats *HTTPAPIStats) Load() map[string]int { return apiStats } -func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) { - u.Lock() - defer u.Unlock() - - usersStat := u.users[user] - if usersStat == nil { - if u.users == nil { - u.users = make(map[string]*userAPIStats) - } - usersStat = &userAPIStats{ - buckets: make(map[bucketKey]bucketStat, 1), - user: user, - } - u.users[user] = usersStat - } - - key := bucketKey{ - name: bucket, - cid: cnrID, - } - - bktStat := usersStat.buckets[key] - bktStat.Operations[reqType]++ - bktStat.InTraffic += in - bktStat.OutTraffic += out - usersStat.buckets[key] = bktStat -} - -func (u *UsersAPIStats) DumpMetrics() UserMetrics { - u.Lock() - defer u.Unlock() - - result := UserMetrics{ - Requests: make([]UserMetricsInfo, 0, len(u.users)), - Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)), - } - - for user, userStat := range u.users { - for key, bktStat := range userStat.buckets { - userBktInfo := UserBucketInfo{ - User: user, - Bucket: key.name, - ContainerID: key.cid, - } - - if bktStat.InTraffic != 0 { - result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{ - UserBucketInfo: userBktInfo, - Type: INTraffic, - Value: bktStat.InTraffic, - }) - } - - if bktStat.OutTraffic != 0 { - result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{ - UserBucketInfo: userBktInfo, - Type: OUTTraffic, - Value: bktStat.OutTraffic, - }) - } - - for op, val := range bktStat.Operations { - if val != 0 { - result.Requests = append(result.Requests, UserMetricsInfo{ - UserBucketInfo: userBktInfo, - Operation: RequestType(op), - Requests: val, - }) - } - } - } - } - - u.users = make(map[string]*userAPIStats) - - return result -} - func (st *HTTPStats) getInputBytes() uint64 { return atomic.LoadUint64(&st.totalInputBytes) } diff --git a/api/metrics_collector.go b/api/metrics_collector.go index eac7c5b..2b9d31a 100644 --- a/api/metrics_collector.go +++ b/api/metrics_collector.go @@ -64,6 +64,5 @@ func (s *stats) Collect(ch chan<- prometheus.Metric) { // connect collectors collectHTTPMetrics(ch) - collectUserMetrics(ch) collectNetworkMetrics(ch) } diff --git a/api/router.go b/api/router.go index 793755f..bf493fc 100644 --- a/api/router.go +++ b/api/router.go @@ -151,9 +151,9 @@ func appendCORS(handler Handler) mux.MiddlewareFunc { type BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error) // metricsMiddleware wraps http handler for api with basic statistics collection. -func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc) mux.MiddlewareFunc { +func metricsMiddleware(log *zap.Logger, resolveBucket BucketResolveFunc, usersStat UsersStat) mux.MiddlewareFunc { return func(h http.Handler) http.Handler { - return Stats(h.ServeHTTP, resolveCID(log, resolveBucket)) + return Stats(h.ServeHTTP, resolveCID(log, resolveBucket), usersStat) } } @@ -223,10 +223,10 @@ func setErrorAPI(apiName string, h http.Handler) http.Handler { } // attachErrorHandler set NotFoundHandler and MethodNotAllowedHandler for mux.Router. -func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center) { +func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth.Center, usersStat UsersStat) { middlewares := []mux.MiddlewareFunc{ AuthMiddleware(log, center), - metricsMiddleware(log, h.ResolveBucket), + metricsMiddleware(log, h.ResolveBucket, usersStat), } var errorHandler http.Handler = http.HandlerFunc(errorResponseHandler) @@ -241,7 +241,7 @@ func attachErrorHandler(api *mux.Router, log *zap.Logger, h Handler, center auth // Attach adds S3 API handlers from h to r for domains with m client limit using // center authentication and log logger. -func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger) { +func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center auth.Center, log *zap.Logger, usersStat UsersStat) { api := r.PathPrefix(SlashSeparator).Subrouter() api.Use( @@ -251,13 +251,13 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut // Attach user authentication for all S3 routes. AuthMiddleware(log, center), - metricsMiddleware(log, h.ResolveBucket), + metricsMiddleware(log, h.ResolveBucket, usersStat), // -- logging error requests logSuccessResponse(log), ) - attachErrorHandler(api, log, h, center) + attachErrorHandler(api, log, h, center, usersStat) buckets := make([]*mux.Router, 0, len(domains)+1) buckets = append(buckets, api.PathPrefix("/{bucket}").Subrouter()) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 1787d95..b1c8c74 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -23,6 +23,7 @@ import ( "github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs" "github.com/TrueCloudLab/frostfs-s3-gw/internal/version" "github.com/TrueCloudLab/frostfs-s3-gw/internal/wallet" + "github.com/TrueCloudLab/frostfs-s3-gw/metrics" "github.com/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/TrueCloudLab/frostfs-sdk-go/pool" "github.com/gorilla/mux" @@ -45,7 +46,7 @@ type ( servers []Server - metrics *appMetrics + metrics *metrics.AppMetrics bucketResolver *resolver.BucketResolver services []*Service settings *appSettings @@ -65,18 +66,6 @@ type ( lvl zap.AtomicLevel } - appMetrics struct { - logger *zap.Logger - provider GateMetricsCollector - mu sync.RWMutex - enabled bool - } - - GateMetricsCollector interface { - SetHealth(int32) - Unregister() - } - placementPolicy struct { mu sync.RWMutex defaultPolicy netmap.PlacementPolicy @@ -183,8 +172,7 @@ func (a *App) initAPI(ctx context.Context) { } func (a *App) initMetrics() { - gateMetricsProvider := newGateMetrics(frostfs.NewPoolStatistic(a.pool)) - a.metrics = newAppMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled)) + a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled)) } func (a *App) initResolver() { @@ -344,47 +332,6 @@ func (p *placementPolicy) update(defaultPolicy string, regionPolicyFilepath stri return nil } -func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics { - if !enabled { - logger.Warn("metrics are disabled") - } - return &appMetrics{ - logger: logger, - provider: provider, - } -} - -func (m *appMetrics) SetEnabled(enabled bool) { - if !enabled { - m.logger.Warn("metrics are disabled") - } - - m.mu.Lock() - m.enabled = enabled - m.mu.Unlock() -} - -func (m *appMetrics) SetHealth(status int32) { - m.mu.RLock() - if !m.enabled { - m.mu.RUnlock() - return - } - m.mu.RUnlock() - - m.provider.SetHealth(status) -} - -func (m *appMetrics) Shutdown() { - m.mu.Lock() - if m.enabled { - m.provider.SetHealth(0) - m.enabled = false - } - m.provider.Unregister() - m.mu.Unlock() -} - func remove(list []string, element string) []string { for i, item := range list { if item == element { @@ -422,7 +369,7 @@ func (a *App) Serve(ctx context.Context) { domains := a.cfg.GetStringSlice(cfgListenDomains) a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains)) router := mux.NewRouter().SkipClean(true).UseEncodedPath() - api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log) + api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log, a.metrics) // Use mux.Router as http.Handler srv := new(http.Server) @@ -519,7 +466,7 @@ func (a *App) startServices() { a.services = append(a.services, pprofService) go pprofService.Start() - prometheusService := NewPrometheusService(a.cfg, a.log) + prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler()) a.services = append(a.services, prometheusService) go prometheusService.Start() } diff --git a/cmd/s3-gw/app_metrics.go b/cmd/s3-gw/app_metrics.go index 42c1ad2..b118e65 100644 --- a/cmd/s3-gw/app_metrics.go +++ b/cmd/s3-gw/app_metrics.go @@ -3,227 +3,12 @@ package main import ( "net/http" - "github.com/TrueCloudLab/frostfs-sdk-go/pool" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/viper" "go.uber.org/zap" ) -const ( - namespace = "frostfs_s3_gw" - stateSubsystem = "state" - poolSubsystem = "pool" - - methodGetBalance = "get_balance" - methodPutContainer = "put_container" - methodGetContainer = "get_container" - methodListContainer = "list_container" - methodDeleteContainer = "delete_container" - methodGetContainerEacl = "get_container_eacl" - methodSetContainerEacl = "set_container_eacl" - methodEndpointInfo = "endpoint_info" - methodNetworkInfo = "network_info" - methodPutObject = "put_object" - methodDeleteObject = "delete_object" - methodGetObject = "get_object" - methodHeadObject = "head_object" - methodRangeObject = "range_object" - methodCreateSession = "create_session" -) - -type StatisticScraper interface { - Statistic() pool.Statistic -} - -type GateMetrics struct { - stateMetrics - poolMetricsCollector -} - -type stateMetrics struct { - healthCheck prometheus.Gauge -} - -type poolMetricsCollector struct { - poolStatScraper StatisticScraper - overallErrors prometheus.Gauge - overallNodeErrors *prometheus.GaugeVec - overallNodeRequests *prometheus.GaugeVec - currentErrors *prometheus.GaugeVec - requestDuration *prometheus.GaugeVec -} - -func newGateMetrics(scraper StatisticScraper) *GateMetrics { - stateMetric := newStateMetrics() - stateMetric.register() - - poolMetric := newPoolMetricsCollector(scraper) - poolMetric.register() - - return &GateMetrics{ - stateMetrics: *stateMetric, - poolMetricsCollector: *poolMetric, - } -} - -func (g *GateMetrics) Unregister() { - g.stateMetrics.unregister() - prometheus.Unregister(&g.poolMetricsCollector) -} - -func newStateMetrics() *stateMetrics { - return &stateMetrics{ - healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: stateSubsystem, - Name: "health", - Help: "Current S3 gateway state", - }), - } -} - -func (m stateMetrics) register() { - prometheus.MustRegister(m.healthCheck) -} - -func (m stateMetrics) unregister() { - prometheus.Unregister(m.healthCheck) -} - -func (m stateMetrics) SetHealth(s int32) { - m.healthCheck.Set(float64(s)) -} - -func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector { - overallErrors := prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: poolSubsystem, - Name: "overall_errors", - Help: "Total number of errors in pool", - }, - ) - - overallNodeErrors := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: poolSubsystem, - Name: "overall_node_errors", - Help: "Total number of errors for connection in pool", - }, - []string{ - "node", - }, - ) - - overallNodeRequests := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: poolSubsystem, - Name: "overall_node_requests", - Help: "Total number of requests to specific node in pool", - }, - []string{ - "node", - }, - ) - - currentErrors := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: poolSubsystem, - Name: "current_errors", - Help: "Number of errors on current connections that will be reset after the threshold", - }, - []string{ - "node", - }, - ) - - requestsDuration := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: poolSubsystem, - Name: "avg_request_duration", - Help: "Average request duration (in milliseconds) for specific method on node in pool", - }, - []string{ - "node", - "method", - }, - ) - - return &poolMetricsCollector{ - poolStatScraper: scraper, - overallErrors: overallErrors, - overallNodeErrors: overallNodeErrors, - overallNodeRequests: overallNodeRequests, - currentErrors: currentErrors, - requestDuration: requestsDuration, - } -} - -func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) { - m.updateStatistic() - m.overallErrors.Collect(ch) - m.overallNodeErrors.Collect(ch) - m.overallNodeRequests.Collect(ch) - m.currentErrors.Collect(ch) - m.requestDuration.Collect(ch) -} - -func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) { - m.overallErrors.Describe(descs) - m.overallNodeErrors.Describe(descs) - m.overallNodeRequests.Describe(descs) - m.currentErrors.Describe(descs) - m.requestDuration.Describe(descs) -} - -func (m *poolMetricsCollector) register() { - prometheus.MustRegister(m) -} - -func (m *poolMetricsCollector) updateStatistic() { - stat := m.poolStatScraper.Statistic() - - m.overallNodeErrors.Reset() - m.overallNodeRequests.Reset() - m.currentErrors.Reset() - m.requestDuration.Reset() - - for _, node := range stat.Nodes() { - m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors())) - m.overallNodeRequests.WithLabelValues(node.Address()).Set(float64(node.Requests())) - - m.currentErrors.WithLabelValues(node.Address()).Set(float64(node.CurrentErrors())) - m.updateRequestsDuration(node) - } - - m.overallErrors.Set(float64(stat.OverallErrors())) -} - -func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) { - m.requestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.AverageGetBalance().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.AveragePutContainer().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.AverageGetContainer().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.AverageListContainer().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.AverageDeleteContainer().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.AverageGetContainerEACL().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.AverageSetContainerEACL().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.AverageEndpointInfo().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.AverageNetworkInfo().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.AveragePutObject().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.AverageDeleteObject().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.AverageGetObject().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds())) - m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds())) -} - // NewPrometheusService creates a new service for gathering prometheus metrics. -func NewPrometheusService(v *viper.Viper, log *zap.Logger) *Service { +func NewPrometheusService(v *viper.Viper, log *zap.Logger, handler http.Handler) *Service { if log == nil { return nil } @@ -231,7 +16,7 @@ func NewPrometheusService(v *viper.Viper, log *zap.Logger) *Service { return &Service{ Server: &http.Server{ Addr: v.GetString(cfgPrometheusAddress), - Handler: promhttp.Handler(), + Handler: handler, }, enabled: v.GetBool(cfgPrometheusEnabled), serviceType: "Prometheus", diff --git a/metrics/app.go b/metrics/app.go new file mode 100644 index 0000000..3a3decc --- /dev/null +++ b/metrics/app.go @@ -0,0 +1,74 @@ +package metrics + +import ( + "net/http" + "sync" + + "github.com/TrueCloudLab/frostfs-s3-gw/api" + "github.com/TrueCloudLab/frostfs-s3-gw/internal/frostfs" + "go.uber.org/zap" +) + +type AppMetrics struct { + logger *zap.Logger + gate *GateMetrics + mu sync.RWMutex + enabled bool +} + +func NewAppMetrics(logger *zap.Logger, poolStatistics *frostfs.PoolStatistic, enabled bool) *AppMetrics { + if !enabled { + logger.Warn("metrics are disabled") + } + return &AppMetrics{ + logger: logger, + gate: NewGateMetrics(poolStatistics), + enabled: enabled, + } +} + +func (m *AppMetrics) SetEnabled(enabled bool) { + if !enabled { + m.logger.Warn("metrics are disabled") + } + + m.mu.Lock() + m.enabled = enabled + m.mu.Unlock() +} + +func (m *AppMetrics) SetHealth(status int32) { + if !m.isEnabled() { + return + } + + m.gate.State.SetHealth(status) +} + +func (m *AppMetrics) Shutdown() { + m.mu.Lock() + if m.enabled { + m.gate.State.SetHealth(0) + m.enabled = false + } + m.gate.Unregister() + m.mu.Unlock() +} + +func (m *AppMetrics) isEnabled() bool { + m.mu.RLock() + defer m.mu.RUnlock() + return m.enabled +} + +func (m *AppMetrics) Handler() http.Handler { + return m.gate.Handler() +} + +func (m *AppMetrics) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) { + if !m.isEnabled() { + return + } + + m.gate.Billing.apiStat.Update(user, bucket, cnrID, reqType, in, out) +} diff --git a/metrics/billing.go b/metrics/billing.go new file mode 100644 index 0000000..bb095b9 --- /dev/null +++ b/metrics/billing.go @@ -0,0 +1,216 @@ +package metrics + +import ( + "sync" + + "github.com/TrueCloudLab/frostfs-s3-gw/api" + "github.com/prometheus/client_golang/prometheus" +) + +const billingSubsystem = "billing" + +type TrafficType int + +const ( + UnknownTraffic TrafficType = iota + INTraffic TrafficType = iota + OUTTraffic TrafficType = iota +) + +func (t TrafficType) String() string { + switch t { + case 1: + return "IN" + case 2: + return "OUT" + default: + return "Unknown" + } +} + +type ( + OperationList [6]int + + UsersAPIStats struct { + users map[string]*userAPIStats + sync.RWMutex + } + + bucketKey struct { + name string + cid string + } + + bucketStat struct { + Operations OperationList + InTraffic uint64 + OutTraffic uint64 + } + + userAPIStats struct { + buckets map[bucketKey]bucketStat + user string + } + + UserBucketInfo struct { + User string + Bucket string + ContainerID string + } + + UserMetricsInfo struct { + UserBucketInfo + Operation api.RequestType + Requests int + } + + UserTrafficMetricsInfo struct { + UserBucketInfo + Type TrafficType + Value uint64 + } + + UserMetrics struct { + Requests []UserMetricsInfo + Traffic []UserTrafficMetricsInfo + } +) + +func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType api.RequestType, in, out uint64) { + u.Lock() + defer u.Unlock() + + usersStat := u.users[user] + if usersStat == nil { + if u.users == nil { + u.users = make(map[string]*userAPIStats) + } + usersStat = &userAPIStats{ + buckets: make(map[bucketKey]bucketStat, 1), + user: user, + } + u.users[user] = usersStat + } + + key := bucketKey{ + name: bucket, + cid: cnrID, + } + + bktStat := usersStat.buckets[key] + bktStat.Operations[reqType]++ + bktStat.InTraffic += in + bktStat.OutTraffic += out + usersStat.buckets[key] = bktStat +} + +func (u *UsersAPIStats) DumpMetrics() UserMetrics { + u.Lock() + defer u.Unlock() + + result := UserMetrics{ + Requests: make([]UserMetricsInfo, 0, len(u.users)), + Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)), + } + + for user, userStat := range u.users { + for key, bktStat := range userStat.buckets { + userBktInfo := UserBucketInfo{ + User: user, + Bucket: key.name, + ContainerID: key.cid, + } + + if bktStat.InTraffic != 0 { + result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{ + UserBucketInfo: userBktInfo, + Type: INTraffic, + Value: bktStat.InTraffic, + }) + } + + if bktStat.OutTraffic != 0 { + result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{ + UserBucketInfo: userBktInfo, + Type: OUTTraffic, + Value: bktStat.OutTraffic, + }) + } + + for op, val := range bktStat.Operations { + if val != 0 { + result.Requests = append(result.Requests, UserMetricsInfo{ + UserBucketInfo: userBktInfo, + Operation: api.RequestType(op), + Requests: val, + }) + } + } + } + } + + u.users = make(map[string]*userAPIStats) + + return result +} + +type billingMetrics struct { + registry *prometheus.Registry + + desc *prometheus.Desc + apiStat UsersAPIStats +} + +func newBillingMetrics() *billingMetrics { + return &billingMetrics{ + registry: prometheus.NewRegistry(), + desc: prometheus.NewDesc("frostfs_s3_billing", "Billing statistics exposed by FrostFS S3 Gate instance", nil, nil), + apiStat: UsersAPIStats{}, + } +} + +func (b *billingMetrics) register() { + b.registry.MustRegister(b) +} + +func (b *billingMetrics) unregister() { + b.registry.Unregister(b) +} + +func (b *billingMetrics) Describe(ch chan<- *prometheus.Desc) { + ch <- b.desc +} + +func (b *billingMetrics) Collect(ch chan<- prometheus.Metric) { + userMetrics := b.apiStat.DumpMetrics() + + for _, value := range userMetrics.Requests { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(namespace, billingSubsystem, "user_requests"), + "", + []string{"user", "bucket", "cid", "operation"}, nil), + prometheus.CounterValue, + float64(value.Requests), + value.User, + value.Bucket, + value.ContainerID, + value.Operation.String(), + ) + } + + for _, value := range userMetrics.Traffic { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(namespace, billingSubsystem, "user_traffic"), + "", + []string{"user", "bucket", "cid", "type"}, nil), + prometheus.CounterValue, + float64(value.Value), + value.User, + value.Bucket, + value.ContainerID, + value.Type.String(), + ) + } +} diff --git a/metrics/gate.go b/metrics/gate.go new file mode 100644 index 0000000..15b1cf7 --- /dev/null +++ b/metrics/gate.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "net/http" + + "github.com/TrueCloudLab/frostfs-sdk-go/pool" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const namespace = "frostfs_s3_gw" + +type StatisticScraper interface { + Statistic() pool.Statistic +} + +type GateMetrics struct { + State stateMetrics + Pool poolMetricsCollector + Billing *billingMetrics +} + +func NewGateMetrics(scraper StatisticScraper) *GateMetrics { + stateMetric := newStateMetrics() + stateMetric.register() + + poolMetric := newPoolMetricsCollector(scraper) + poolMetric.register() + + billingMetric := newBillingMetrics() + billingMetric.register() + + return &GateMetrics{ + State: *stateMetric, + Pool: *poolMetric, + Billing: billingMetric, + } +} + +func (g *GateMetrics) Unregister() { + g.State.unregister() + prometheus.Unregister(&g.Pool) + g.Billing.unregister() +} + +func (g *GateMetrics) Handler() http.Handler { + handler := http.NewServeMux() + handler.Handle("/", promhttp.Handler()) + handler.Handle("/metrics/billing", promhttp.HandlerFor(g.Billing.registry, promhttp.HandlerOpts{})) + return handler +} diff --git a/metrics/pool.go b/metrics/pool.go new file mode 100644 index 0000000..bada57c --- /dev/null +++ b/metrics/pool.go @@ -0,0 +1,162 @@ +package metrics + +import ( + "github.com/TrueCloudLab/frostfs-sdk-go/pool" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + poolSubsystem = "pool" + + methodGetBalance = "get_balance" + methodPutContainer = "put_container" + methodGetContainer = "get_container" + methodListContainer = "list_container" + methodDeleteContainer = "delete_container" + methodGetContainerEacl = "get_container_eacl" + methodSetContainerEacl = "set_container_eacl" + methodEndpointInfo = "endpoint_info" + methodNetworkInfo = "network_info" + methodPutObject = "put_object" + methodDeleteObject = "delete_object" + methodGetObject = "get_object" + methodHeadObject = "head_object" + methodRangeObject = "range_object" + methodCreateSession = "create_session" +) + +type poolMetricsCollector struct { + poolStatScraper StatisticScraper + overallErrors prometheus.Gauge + overallNodeErrors *prometheus.GaugeVec + overallNodeRequests *prometheus.GaugeVec + currentErrors *prometheus.GaugeVec + requestDuration *prometheus.GaugeVec +} + +func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector { + overallErrors := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: poolSubsystem, + Name: "overall_errors", + Help: "Total number of errors in pool", + }, + ) + + overallNodeErrors := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: poolSubsystem, + Name: "overall_node_errors", + Help: "Total number of errors for connection in pool", + }, + []string{ + "node", + }, + ) + + overallNodeRequests := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: poolSubsystem, + Name: "overall_node_requests", + Help: "Total number of requests to specific node in pool", + }, + []string{ + "node", + }, + ) + + currentErrors := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: poolSubsystem, + Name: "current_errors", + Help: "Number of errors on current connections that will be reset after the threshold", + }, + []string{ + "node", + }, + ) + + requestsDuration := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: poolSubsystem, + Name: "avg_request_duration", + Help: "Average request duration (in milliseconds) for specific method on node in pool", + }, + []string{ + "node", + "method", + }, + ) + + return &poolMetricsCollector{ + poolStatScraper: scraper, + overallErrors: overallErrors, + overallNodeErrors: overallNodeErrors, + overallNodeRequests: overallNodeRequests, + currentErrors: currentErrors, + requestDuration: requestsDuration, + } +} + +func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) { + m.updateStatistic() + m.overallErrors.Collect(ch) + m.overallNodeErrors.Collect(ch) + m.overallNodeRequests.Collect(ch) + m.currentErrors.Collect(ch) + m.requestDuration.Collect(ch) +} + +func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) { + m.overallErrors.Describe(descs) + m.overallNodeErrors.Describe(descs) + m.overallNodeRequests.Describe(descs) + m.currentErrors.Describe(descs) + m.requestDuration.Describe(descs) +} + +func (m *poolMetricsCollector) register() { + prometheus.MustRegister(m) +} + +func (m *poolMetricsCollector) updateStatistic() { + stat := m.poolStatScraper.Statistic() + + m.overallNodeErrors.Reset() + m.overallNodeRequests.Reset() + m.currentErrors.Reset() + m.requestDuration.Reset() + + for _, node := range stat.Nodes() { + m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors())) + m.overallNodeRequests.WithLabelValues(node.Address()).Set(float64(node.Requests())) + + m.currentErrors.WithLabelValues(node.Address()).Set(float64(node.CurrentErrors())) + m.updateRequestsDuration(node) + } + + m.overallErrors.Set(float64(stat.OverallErrors())) +} + +func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) { + m.requestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.AverageGetBalance().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.AveragePutContainer().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.AverageGetContainer().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.AverageListContainer().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.AverageDeleteContainer().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.AverageGetContainerEACL().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.AverageSetContainerEACL().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.AverageEndpointInfo().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.AverageNetworkInfo().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.AveragePutObject().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.AverageDeleteObject().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.AverageGetObject().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds())) + m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds())) +} diff --git a/metrics/state.go b/metrics/state.go new file mode 100644 index 0000000..9d28c5b --- /dev/null +++ b/metrics/state.go @@ -0,0 +1,32 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const stateSubsystem = "state" + +type stateMetrics struct { + healthCheck prometheus.Gauge +} + +func newStateMetrics() *stateMetrics { + return &stateMetrics{ + healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: stateSubsystem, + Name: "health", + Help: "Current S3 gateway state", + }), + } +} + +func (m stateMetrics) register() { + prometheus.MustRegister(m.healthCheck) +} + +func (m stateMetrics) unregister() { + prometheus.Unregister(m.healthCheck) +} + +func (m stateMetrics) SetHealth(s int32) { + m.healthCheck.Set(float64(s)) +}