[#321] Use correct owner id in billing metrics #322
11 changed files with 161 additions and 32 deletions
|
@ -10,6 +10,7 @@ This document outlines major changes between releases.
|
||||||
- Fix `NextVersionIDMarker` in `list-object-versions` (#248)
|
- Fix `NextVersionIDMarker` in `list-object-versions` (#248)
|
||||||
- Fix possibility of panic during SIGHUP (#288)
|
- Fix possibility of panic during SIGHUP (#288)
|
||||||
- Fix flaky `TestErrorTimeoutChecking` (`make test` sometimes failed) (#290)
|
- Fix flaky `TestErrorTimeoutChecking` (`make test` sometimes failed) (#290)
|
||||||
|
- Fix user owner ID in billing metrics (#321)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Add new `frostfs.buffer_max_size_for_put` config param and sync TZ hash for PUT operations (#197)
|
- Add new `frostfs.buffer_max_size_for_put` config param and sync TZ hash for PUT operations (#197)
|
||||||
|
|
|
@ -46,6 +46,8 @@ func Auth(center Center, log *zap.Logger) Func {
|
||||||
return func(h http.Handler) http.Handler {
|
return func(h http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
reqInfo := GetReqInfo(ctx)
|
||||||
|
reqInfo.User = "anon"
|
||||||
box, err := center.Authenticate(r)
|
box, err := center.Authenticate(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrNoAuthorizationHeader {
|
if err == ErrNoAuthorizationHeader {
|
||||||
|
@ -64,6 +66,10 @@ func Auth(center Center, log *zap.Logger) Func {
|
||||||
ctx = SetClientTime(ctx, box.ClientTime)
|
ctx = SetClientTime(ctx, box.ClientTime)
|
||||||
}
|
}
|
||||||
ctx = SetAuthHeaders(ctx, box.AuthHeaders)
|
ctx = SetAuthHeaders(ctx, box.AuthHeaders)
|
||||||
|
|
||||||
|
if box.AccessBox.Gate.BearerToken != nil {
|
||||||
|
reqInfo.User = bearer.ResolveIssuer(*box.AccessBox.Gate.BearerToken).String()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.ServeHTTP(w, r.WithContext(ctx))
|
h.ServeHTTP(w, r.WithContext(ctx))
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -80,9 +79,8 @@ func stats(f http.HandlerFunc, resolveCID cidResolveFunc, appMetrics *metrics.Ap
|
||||||
// simply for the fact that it is not human-readable.
|
// simply for the fact that it is not human-readable.
|
||||||
durationSecs := time.Since(statsWriter.startTime).Seconds()
|
durationSecs := time.Since(statsWriter.startTime).Seconds()
|
||||||
|
|
||||||
user := resolveUser(r.Context())
|
|
||||||
cnrID := resolveCID(r.Context(), reqInfo)
|
cnrID := resolveCID(r.Context(), reqInfo)
|
||||||
appMetrics.Update(user, reqInfo.BucketName, cnrID, settings.ResolveNamespaceAlias(reqInfo.Namespace),
|
appMetrics.UsersAPIStats().Update(reqInfo.User, reqInfo.BucketName, cnrID, settings.ResolveNamespaceAlias(reqInfo.Namespace),
|
||||||
requestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
|
requestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
|
||||||
|
|
||||||
code := statsWriter.statusCode
|
code := statsWriter.statusCode
|
||||||
|
@ -149,14 +147,6 @@ func resolveCID(log *zap.Logger, resolveBucket BucketResolveFunc) cidResolveFunc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveUser(ctx context.Context) string {
|
|
||||||
user := "anon"
|
|
||||||
if bd, err := GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
|
||||||
user = bearer.ResolveIssuer(*bd.Gate.BearerToken).String()
|
|
||||||
}
|
|
||||||
return user
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteHeader -- writes http status code.
|
// WriteHeader -- writes http status code.
|
||||||
func (w *responseWrapper) WriteHeader(code int) {
|
func (w *responseWrapper) WriteHeader(code int) {
|
||||||
w.Do(func() {
|
w.Do(func() {
|
||||||
|
|
|
@ -39,6 +39,7 @@ type (
|
||||||
TraceID string // Trace ID
|
TraceID string // Trace ID
|
||||||
URL *url.URL // Request url
|
URL *url.URL // Request url
|
||||||
Namespace string
|
Namespace string
|
||||||
|
User string // User owner id
|
||||||
tags []KeyVal // Any additional info not accommodated by above fields
|
tags []KeyVal // Any additional info not accommodated by above fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,16 +8,47 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
bearertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
const FrostfsNamespaceHeader = "X-Frostfs-Namespace"
|
const FrostfsNamespaceHeader = "X-Frostfs-Namespace"
|
||||||
|
|
||||||
|
type poolStatisticMock struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *poolStatisticMock) Statistic() pool.Statistic {
|
||||||
|
return pool.Statistic{}
|
||||||
|
}
|
||||||
|
|
||||||
type centerMock struct {
|
type centerMock struct {
|
||||||
|
t *testing.T
|
||||||
|
anon bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *centerMock) Authenticate(*http.Request) (*middleware.Box, error) {
|
func (c *centerMock) Authenticate(*http.Request) (*middleware.Box, error) {
|
||||||
return &middleware.Box{}, nil
|
var token *bearer.Token
|
||||||
|
|
||||||
|
if !c.anon {
|
||||||
|
bt := bearertest.Token()
|
||||||
|
token = &bt
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(c.t, err)
|
||||||
|
require.NoError(c.t, token.Sign(key.PrivateKey))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &middleware.Box{
|
||||||
|
AccessBox: &accessbox.Box{
|
||||||
|
Gate: &accessbox.GateData{
|
||||||
|
BearerToken: token,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type middlewareSettingsMock struct {
|
type middlewareSettingsMock struct {
|
||||||
|
@ -36,6 +67,17 @@ func (r *middlewareSettingsMock) PolicyDenyByDefault() bool {
|
||||||
return r.denyByDefault
|
return r.denyByDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type frostFSIDMock struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *frostFSIDMock) ValidatePublicKey(*keys.PublicKey) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *frostFSIDMock) GetUserGroupIDs(util.Uint160) ([]string, error) {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type handlerMock struct {
|
type handlerMock struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/schema/s3"
|
"git.frostfs.info/TrueCloudLab/policy-engine/schema/s3"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
@ -40,18 +41,28 @@ func prepareRouter(t *testing.T) *routerMock {
|
||||||
middlewareSettings := &middlewareSettingsMock{}
|
middlewareSettings := &middlewareSettingsMock{}
|
||||||
policyChecker := inmemory.NewInMemoryLocalOverrides()
|
policyChecker := inmemory.NewInMemoryLocalOverrides()
|
||||||
|
|
||||||
|
logger := zaptest.NewLogger(t)
|
||||||
|
|
||||||
|
metricsConfig := metrics.AppMetricsConfig{
|
||||||
|
Logger: logger,
|
||||||
|
PoolStatistics: &poolStatisticMock{},
|
||||||
|
Registerer: prometheus.NewRegistry(),
|
||||||
|
Enabled: true,
|
||||||
|
}
|
||||||
|
|
||||||
cfg := Config{
|
cfg := Config{
|
||||||
Throttle: middleware.ThrottleOpts{
|
Throttle: middleware.ThrottleOpts{
|
||||||
Limit: 10,
|
Limit: 10,
|
||||||
BacklogTimeout: 30 * time.Second,
|
BacklogTimeout: 30 * time.Second,
|
||||||
},
|
},
|
||||||
Handler: &handlerMock{t: t},
|
Handler: &handlerMock{t: t},
|
||||||
Center: ¢erMock{},
|
Center: ¢erMock{t: t},
|
||||||
Log: zaptest.NewLogger(t),
|
Log: logger,
|
||||||
Metrics: &metrics.AppMetrics{},
|
Metrics: metrics.NewAppMetrics(metricsConfig),
|
||||||
MiddlewareSettings: middlewareSettings,
|
MiddlewareSettings: middlewareSettings,
|
||||||
PolicyChecker: policyChecker,
|
PolicyChecker: policyChecker,
|
||||||
Domains: []string{"domain1", "domain2"},
|
Domains: []string{"domain1", "domain2"},
|
||||||
|
FrostfsID: &frostFSIDMock{},
|
||||||
}
|
}
|
||||||
return &routerMock{
|
return &routerMock{
|
||||||
router: NewRouter(cfg),
|
router: NewRouter(cfg),
|
||||||
|
@ -253,6 +264,54 @@ func TestDefaultBehaviorPolicyChecker(t *testing.T) {
|
||||||
assertAPIError(t, w, apiErrors.ErrAccessDenied)
|
assertAPIError(t, w, apiErrors.ErrAccessDenied)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOwnerIDRetrieving(t *testing.T) {
|
||||||
|
chiRouter := prepareRouter(t)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
r := httptest.NewRequest(http.MethodGet, "/test-bucket", nil)
|
||||||
|
|
||||||
|
chiRouter.ServeHTTP(w, r)
|
||||||
|
resp := readResponse(t, w)
|
||||||
|
require.NotEqual(t, "anon", resp.ReqInfo.User)
|
||||||
|
|
||||||
|
w = httptest.NewRecorder()
|
||||||
|
r = httptest.NewRequest(http.MethodGet, "/test-bucket", nil)
|
||||||
|
|
||||||
|
chiRouter.cfg.Center.(*centerMock).anon = true
|
||||||
|
|
||||||
|
chiRouter.ServeHTTP(w, r)
|
||||||
|
resp = readResponse(t, w)
|
||||||
|
require.Equal(t, "anon", resp.ReqInfo.User)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBillingMetrics(t *testing.T) {
|
||||||
|
chiRouter := prepareRouter(t)
|
||||||
|
|
||||||
|
bktName, objName := "test-bucket", "test-object"
|
||||||
|
target := fmt.Sprintf("/%s/%s", bktName, objName)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
r := httptest.NewRequest(http.MethodPut, target, nil)
|
||||||
|
|
||||||
|
chiRouter.ServeHTTP(w, r)
|
||||||
|
dump := chiRouter.cfg.Metrics.UsersAPIStats().DumpMetrics()
|
||||||
|
require.Len(t, dump.Requests, 1)
|
||||||
|
require.NotEqual(t, "anon", dump.Requests[0].User)
|
||||||
|
require.Equal(t, metrics.PUTRequest, dump.Requests[0].Operation)
|
||||||
|
require.Equal(t, bktName, dump.Requests[0].Bucket)
|
||||||
|
require.Equal(t, 1, dump.Requests[0].Requests)
|
||||||
|
|
||||||
|
chiRouter.cfg.Center.(*centerMock).anon = true
|
||||||
|
|
||||||
|
w = httptest.NewRecorder()
|
||||||
|
r = httptest.NewRequest(http.MethodPut, target, nil)
|
||||||
|
|
||||||
|
chiRouter.ServeHTTP(w, r)
|
||||||
|
dump = chiRouter.cfg.Metrics.UsersAPIStats().DumpMetrics()
|
||||||
|
require.Len(t, dump.Requests, 1)
|
||||||
|
require.Equal(t, "anon", dump.Requests[0].User)
|
||||||
|
}
|
||||||
|
|
||||||
func readResponse(t *testing.T, w *httptest.ResponseRecorder) handlerResult {
|
func readResponse(t *testing.T, w *httptest.ResponseRecorder) handlerResult {
|
||||||
var res handlerResult
|
var res handlerResult
|
||||||
|
|
||||||
|
|
|
@ -439,7 +439,13 @@ func (a *App) initControlAPI() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initMetrics() {
|
func (a *App) initMetrics() {
|
||||||
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
|
cfg := metrics.AppMetricsConfig{
|
||||||
|
Logger: a.log,
|
||||||
|
PoolStatistics: frostfs.NewPoolStatistic(a.pool),
|
||||||
|
Enabled: a.cfg.GetBool(cfgPrometheusEnabled),
|
||||||
|
}
|
||||||
|
|
||||||
|
a.metrics = metrics.NewAppMetrics(cfg)
|
||||||
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
|
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -16,14 +17,27 @@ type AppMetrics struct {
|
||||||
enabled bool
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAppMetrics(logger *zap.Logger, poolStatistics StatisticScraper, enabled bool) *AppMetrics {
|
type AppMetricsConfig struct {
|
||||||
if !enabled {
|
Logger *zap.Logger
|
||||||
logger.Warn(logs.MetricsAreDisabled)
|
PoolStatistics StatisticScraper
|
||||||
|
Registerer prometheus.Registerer
|
||||||
|
Enabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAppMetrics(cfg AppMetricsConfig) *AppMetrics {
|
||||||
|
if !cfg.Enabled {
|
||||||
|
cfg.Logger.Warn(logs.MetricsAreDisabled)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
registry := cfg.Registerer
|
||||||
|
if registry == nil {
|
||||||
|
registry = prometheus.DefaultRegisterer
|
||||||
|
}
|
||||||
|
|
||||||
return &AppMetrics{
|
return &AppMetrics{
|
||||||
logger: logger,
|
logger: cfg.Logger,
|
||||||
gate: NewGateMetrics(poolStatistics),
|
gate: NewGateMetrics(cfg.PoolStatistics, registry),
|
||||||
enabled: enabled,
|
enabled: cfg.Enabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,12 +79,11 @@ func (m *AppMetrics) Handler() http.Handler {
|
||||||
return m.gate.Handler()
|
return m.gate.Handler()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *AppMetrics) Update(user, bucket, cnrID, ns string, reqType RequestType, in, out uint64) {
|
func (m *AppMetrics) UsersAPIStats() *UsersAPIStats {
|
||||||
if !m.isEnabled() {
|
if !m.isEnabled() {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
return m.gate.Billing.apiStat
|
||||||
m.gate.Billing.apiStat.Update(user, bucket, cnrID, ns, reqType, in, out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *AppMetrics) Statistic() *APIStatMetrics {
|
func (m *AppMetrics) Statistic() *APIStatMetrics {
|
||||||
|
|
|
@ -111,6 +111,10 @@ type (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (u *UsersAPIStats) Update(user, bucket, cnrID, ns string, reqType RequestType, in, out uint64) {
|
func (u *UsersAPIStats) Update(user, bucket, cnrID, ns string, reqType RequestType, in, out uint64) {
|
||||||
|
if u == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
u.Lock()
|
u.Lock()
|
||||||
defer u.Unlock()
|
defer u.Unlock()
|
||||||
|
|
||||||
|
@ -140,6 +144,10 @@ func (u *UsersAPIStats) Update(user, bucket, cnrID, ns string, reqType RequestTy
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *UsersAPIStats) DumpMetrics() UserMetrics {
|
func (u *UsersAPIStats) DumpMetrics() UserMetrics {
|
||||||
|
if u == nil {
|
||||||
|
return UserMetrics{}
|
||||||
|
}
|
||||||
|
|
||||||
u.Lock()
|
u.Lock()
|
||||||
defer u.Unlock()
|
defer u.Unlock()
|
||||||
|
|
||||||
|
@ -195,7 +203,7 @@ type billingMetrics struct {
|
||||||
|
|
||||||
userRequestsDesc *prometheus.Desc
|
userRequestsDesc *prometheus.Desc
|
||||||
userTrafficDesc *prometheus.Desc
|
userTrafficDesc *prometheus.Desc
|
||||||
apiStat UsersAPIStats
|
apiStat *UsersAPIStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBillingMetrics() *billingMetrics {
|
func newBillingMetrics() *billingMetrics {
|
||||||
|
@ -203,7 +211,7 @@ func newBillingMetrics() *billingMetrics {
|
||||||
registry: prometheus.NewRegistry(),
|
registry: prometheus.NewRegistry(),
|
||||||
userRequestsDesc: newDesc(appMetricsDesc[billingSubsystem][userRequestsMetric]),
|
userRequestsDesc: newDesc(appMetricsDesc[billingSubsystem][userRequestsMetric]),
|
||||||
userTrafficDesc: newDesc(appMetricsDesc[billingSubsystem][userTrafficMetric]),
|
userTrafficDesc: newDesc(appMetricsDesc[billingSubsystem][userTrafficMetric]),
|
||||||
apiStat: UsersAPIStats{},
|
apiStat: &UsersAPIStats{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,12 @@ var metricsPath = flag.String("out", "", "File to export s3 gateway metrics to."
|
||||||
|
|
||||||
func TestDescribeAll(t *testing.T) {
|
func TestDescribeAll(t *testing.T) {
|
||||||
// to check correct metrics type mapping
|
// to check correct metrics type mapping
|
||||||
_ = NewAppMetrics(zaptest.NewLogger(t), mock{}, true)
|
cfg := AppMetricsConfig{
|
||||||
|
Logger: zaptest.NewLogger(t),
|
||||||
|
PoolStatistics: mock{},
|
||||||
|
Enabled: true,
|
||||||
|
}
|
||||||
|
_ = NewAppMetrics(cfg)
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,7 @@ type GateMetrics struct {
|
||||||
HTTPServer *httpServerMetrics
|
HTTPServer *httpServerMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGateMetrics(scraper StatisticScraper) *GateMetrics {
|
func NewGateMetrics(scraper StatisticScraper, registry prometheus.Registerer) *GateMetrics {
|
||||||
registry := prometheus.DefaultRegisterer
|
|
||||||
|
|
||||||
stateMetric := newStateMetrics()
|
stateMetric := newStateMetrics()
|
||||||
registry.MustRegister(stateMetric)
|
registry.MustRegister(stateMetric)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue