[TrueCloudLab#5] Add traffic metrics per user

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2022-12-26 16:13:24 +03:00 committed by Alex Vanin
parent fc5c09c084
commit c5570e661d
3 changed files with 116 additions and 30 deletions

View file

@ -13,6 +13,25 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
type TrafficType int
const (
UnknownTraffic TrafficType = iota
INTraffic TrafficType = iota
OUTTraffic TrafficType = iota
)
func (t TrafficType) String() string {
switch t {
case 1:
return "IN"
case 2:
return "OUT"
default:
return "Unknown"
}
}
type RequestType int type RequestType int
const ( const (
@ -90,11 +109,40 @@ type (
cid string cid string
} }
bucketStat struct {
Operations OperationList
InTraffic uint64
OutTraffic uint64
}
userAPIStats struct { userAPIStats struct {
buckets map[bucketKey]OperationList buckets map[bucketKey]bucketStat
user string user string
} }
UserBucketInfo struct {
User string
Bucket string
ContainerID string
}
UserMetricsInfo struct {
UserBucketInfo
Operation RequestType
Requests int
}
UserTrafficMetricsInfo struct {
UserBucketInfo
Type TrafficType
Value uint64
}
UserMetrics struct {
Requests []UserMetricsInfo
Traffic []UserTrafficMetricsInfo
}
// HTTPStats holds statistics information about // HTTPStats holds statistics information about
// HTTP requests made by all clients. // HTTP requests made by all clients.
HTTPStats struct { HTTPStats struct {
@ -178,8 +226,12 @@ func collectHTTPMetrics(ch chan<- prometheus.Metric) {
api, api,
) )
} }
}
for _, value := range httpStatsMetric.usersS3Requests.DumpMetrics() { func collectUserMetrics(ch chan<- prometheus.Metric) {
userMetrics := httpStatsMetric.usersS3Requests.DumpMetrics()
for _, value := range userMetrics.Requests {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc( prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "user_requests", "count"), prometheus.BuildFQName("frostfs_s3", "user_requests", "count"),
@ -190,7 +242,22 @@ func collectHTTPMetrics(ch chan<- prometheus.Metric) {
value.User, value.User,
value.Bucket, value.Bucket,
value.ContainerID, value.ContainerID,
value.Operation, value.Operation.String(),
)
}
for _, value := range userMetrics.Traffic {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("frostfs_s3", "user_traffic", "bytes"),
"",
[]string{"user", "bucket", "cid", "type"}, nil),
prometheus.CounterValue,
float64(value.Value),
value.User,
value.Bucket,
value.ContainerID,
value.Type.String(),
) )
} }
} }
@ -218,7 +285,7 @@ func APIStats(api string, f http.HandlerFunc) http.HandlerFunc {
// simply for the fact that it is not human readable. // simply for the fact that it is not human readable.
durationSecs := time.Since(statsWriter.startTime).Seconds() durationSecs := time.Since(statsWriter.startTime).Seconds()
httpStatsMetric.updateStats(api, statsWriter, r, durationSecs) httpStatsMetric.updateStats(api, statsWriter, r, durationSecs, in.countBytes, out.countBytes)
atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes) atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes)
atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes) atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes)
@ -261,7 +328,7 @@ func (stats *HTTPAPIStats) Load() map[string]int {
return apiStats return apiStats
} }
func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType) { func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType, in, out uint64) {
u.Lock() u.Lock()
defer u.Unlock() defer u.Unlock()
@ -271,7 +338,7 @@ func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType)
u.users = make(map[string]*userAPIStats) u.users = make(map[string]*userAPIStats)
} }
usersStat = &userAPIStats{ usersStat = &userAPIStats{
buckets: make(map[bucketKey]OperationList, 1), buckets: make(map[bucketKey]bucketStat, 1),
user: user, user: user,
} }
u.users[user] = usersStat u.users[user] = usersStat
@ -282,33 +349,51 @@ func (u *UsersAPIStats) Update(user, bucket, cnrID string, reqType RequestType)
cid: cnrID, cid: cnrID,
} }
bucketStat := usersStat.buckets[key] bktStat := usersStat.buckets[key]
bucketStat[reqType] += 1 bktStat.Operations[reqType] += 1
usersStat.buckets[key] = bucketStat bktStat.InTraffic += in
bktStat.OutTraffic += out
usersStat.buckets[key] = bktStat
} }
type UserMetricsInfo struct { func (u *UsersAPIStats) DumpMetrics() UserMetrics {
User string
Bucket string
ContainerID string
Operation string
Requests int
}
func (u *UsersAPIStats) DumpMetrics() []UserMetricsInfo {
u.Lock() u.Lock()
defer u.Unlock() defer u.Unlock()
result := make([]UserMetricsInfo, 0, len(u.users)) result := UserMetrics{
Requests: make([]UserMetricsInfo, 0, len(u.users)),
Traffic: make([]UserTrafficMetricsInfo, 0, len(u.users)),
}
for user, userStat := range u.users { for user, userStat := range u.users {
for key, operations := range userStat.buckets { for key, bktStat := range userStat.buckets {
for op, val := range operations { userBktInfo := UserBucketInfo{
if val != 0 {
result = append(result, UserMetricsInfo{
User: user, User: user,
Bucket: key.name, Bucket: key.name,
ContainerID: key.cid, ContainerID: key.cid,
Operation: RequestType(op).String(), }
if bktStat.InTraffic != 0 {
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
UserBucketInfo: userBktInfo,
Type: INTraffic,
Value: bktStat.InTraffic,
})
}
if bktStat.OutTraffic != 0 {
result.Traffic = append(result.Traffic, UserTrafficMetricsInfo{
UserBucketInfo: userBktInfo,
Type: OUTTraffic,
Value: bktStat.OutTraffic,
})
}
for op, val := range bktStat.Operations {
if val != 0 {
result.Requests = append(result.Requests, UserMetricsInfo{
UserBucketInfo: userBktInfo,
Operation: RequestType(op),
Requests: val, Requests: val,
}) })
} }
@ -330,7 +415,7 @@ func (st *HTTPStats) getOutputBytes() uint64 {
} }
// Update statistics from http request and response data. // Update statistics from http request and response data.
func (st *HTTPStats) updateStats(apiOperation string, w http.ResponseWriter, r *http.Request, durationSecs float64) { func (st *HTTPStats) updateStats(apiOperation string, w http.ResponseWriter, r *http.Request, durationSecs float64, in, out uint64) {
var code int var code int
if res, ok := w.(*responseWrapper); ok { if res, ok := w.(*responseWrapper); ok {
@ -345,7 +430,7 @@ func (st *HTTPStats) updateStats(apiOperation string, w http.ResponseWriter, r *
reqInfo := GetReqInfo(r.Context()) reqInfo := GetReqInfo(r.Context())
cnrID := GetCID(r.Context()) cnrID := GetCID(r.Context())
st.usersS3Requests.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(apiOperation)) st.usersS3Requests.Update(user, reqInfo.BucketName, cnrID, RequestTypeFromAPI(apiOperation), in, out)
// A successful request has a 2xx response code // A successful request has a 2xx response code
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices successReq := code >= http.StatusOK && code < http.StatusMultipleChoices

View file

@ -64,5 +64,6 @@ func (s *stats) Collect(ch chan<- prometheus.Metric) {
// connect collectors // connect collectors
collectHTTPMetrics(ch) collectHTTPMetrics(ch)
collectUserMetrics(ch)
collectNetworkMetrics(ch) collectNetworkMetrics(ch)
} }

View file

@ -152,7 +152,7 @@ func resolveBucket(log *zap.Logger, resolveBucket func(ctx context.Context, buck
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqInfo := GetReqInfo(r.Context()) reqInfo := GetReqInfo(r.Context())
if reqInfo.BucketName != "" { if reqInfo.BucketName != "" && reqInfo.API != "CreateBucket" {
bktInfo, err := resolveBucket(r.Context(), reqInfo.BucketName) bktInfo, err := resolveBucket(r.Context(), reqInfo.BucketName)
if err != nil { if err != nil {
code := WriteErrorResponse(w, reqInfo, err) code := WriteErrorResponse(w, reqInfo, err)