Compare commits

...

7 commits

Author SHA1 Message Date
b1d273c63c Update sdk
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-07-31 16:33:35 +03:00
02cfd46648 Add go traces regions and tasks
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-07-31 15:08:46 +03:00
c64871ff6c Add intermediate metrics
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-07-31 14:57:06 +03:00
3bbc8cce39 Release v0.30.1
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-07-25 14:31:33 +03:00
2fefed842d [#439] Update SDK version
New SDK version supports extended log records in pool component during inner nodes health check.

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
(cherry picked from commit c506620199)
2024-07-24 16:38:48 +03:00
2bae704d3e [#437] tree: Add cleanOldNodes method
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-23 16:47:05 +03:00
689f7ee818 [#437] tree: Support removing old split system nodes
It's need to fit user expectation on deleting CORs for example.
Previously after removing cors (that was uploaded in split manner)
we can still get some data (from other node)
because deletion worked only for latest node version.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-22 10:42:11 +03:00
22 changed files with 411 additions and 110 deletions

View file

@ -4,6 +4,14 @@ This document outlines major changes between releases.
## [Unreleased] ## [Unreleased]
## [0.30.1] - 2024-07-25
### Fixed
- Redundant system node removal in tree service (#437)
### Added
- Log details on SDK Pool health status change (#439)
## [0.30.0] - Kangshung -2024-07-19 ## [0.30.0] - Kangshung -2024-07-19
### Fixed ### Fixed
@ -233,4 +241,5 @@ To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs
[0.29.2]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.1...v0.29.2 [0.29.2]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.1...v0.29.2
[0.29.3]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.2...v0.29.3 [0.29.3]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.2...v0.29.3
[0.30.0]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.3...v0.30.0 [0.30.0]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.3...v0.30.0
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.30.0...master [0.30.1]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.30.0...v0.30.1
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.30.1...master

View file

@ -1 +1 @@
v0.30.0 v0.30.1

View file

@ -12,6 +12,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"runtime/trace"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -182,6 +183,12 @@ type createBucketParams struct {
} }
func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
if trace.IsEnabled() {
pctx, task := trace.NewTask(r.Context(), "PutObjectHandler")
defer task.End()
r = r.WithContext(pctx)
}
var ( var (
err error err error
cannedACLStatus = aclHeadersStatus(r) cannedACLStatus = aclHeadersStatus(r)

View file

@ -49,17 +49,19 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
return fmt.Errorf("put system object: %w", err) return fmt.Errorf("put system object: %w", err)
} }
objIDToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID) objIDsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID)
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objIDToDeleteNotFound { if err != nil && !objIDToDeleteNotFound {
return err return err
} }
if !objIDToDeleteNotFound { if !objIDToDeleteNotFound {
if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil { for _, id := range objIDsToDelete {
n.reqLogger(ctx).Error(logs.CouldntDeleteCorsObject, zap.Error(err), if err = n.objectDelete(ctx, p.BktInfo, id); err != nil {
zap.String("cnrID", p.BktInfo.CID.EncodeToString()), n.reqLogger(ctx).Error(logs.CouldntDeleteCorsObject, zap.Error(err),
zap.String("objID", objIDToDelete.EncodeToString())) zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
zap.String("objID", id.EncodeToString()))
}
} }
} }
@ -81,14 +83,16 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
} }
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error { func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
objID, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) objIDs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
objIDNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objIDNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objIDNotFound { if err != nil && !objIDNotFound {
return err return err
} }
if !objIDNotFound { if !objIDNotFound {
if err = n.objectDelete(ctx, bktInfo, objID); err != nil { for _, id := range objIDs {
return err if err = n.objectDelete(ctx, bktInfo, id); err != nil {
return err
}
} }
} }

View file

@ -124,7 +124,7 @@ func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketI
return node.OID, nil return node.OID, nil
} }
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) { func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()] systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok { if !ok {
systemMap = make(map[string]*data.BaseNodeVersion) systemMap = make(map[string]*data.BaseNodeVersion)
@ -136,10 +136,10 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI
t.system[bktInfo.CID.EncodeToString()] = systemMap t.system[bktInfo.CID.EncodeToString()] = systemMap
return oid.ID{}, ErrNoNodeToRemove return nil, ErrNoNodeToRemove
} }
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) (oid.ID, error) { func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.ID, error) {
panic("implement me") panic("implement me")
} }

View file

@ -25,13 +25,13 @@ type TreeService interface {
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS. // PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error)
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS. // DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.ID, error)
GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error)
PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"runtime/trace"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
@ -48,6 +49,8 @@ var ErrNoAuthorizationHeader = errors.New("no authorization header")
func Auth(center Center, log *zap.Logger) Func { func Auth(center Center, log *zap.Logger) Func {
return func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reg := trace.StartRegion(r.Context(), "handler:auth")
ctx := r.Context() ctx := r.Context()
reqInfo := GetReqInfo(ctx) reqInfo := GetReqInfo(ctx)
reqInfo.User = "anon" reqInfo.User = "anon"
@ -64,6 +67,7 @@ func Auth(center Center, log *zap.Logger) Func {
if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil { if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil {
reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr)) reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
} }
reg.End()
return return
} }
} else { } else {
@ -75,6 +79,8 @@ func Auth(center Center, log *zap.Logger) Func {
reqLogOrDefault(ctx, log).Debug(logs.SuccessfulAuth, zap.String("accessKeyID", box.AuthHeaders.AccessKeyID)) reqLogOrDefault(ctx, log).Debug(logs.SuccessfulAuth, zap.String("accessKeyID", box.AuthHeaders.AccessKeyID))
} }
reg.End()
h.ServeHTTP(w, r.WithContext(ctx)) h.ServeHTTP(w, r.WithContext(ctx))
}) })
} }
@ -87,11 +93,13 @@ type FrostFSIDValidator interface {
func FrostfsIDValidation(frostfsID FrostFSIDValidator, log *zap.Logger) Func { func FrostfsIDValidation(frostfsID FrostFSIDValidator, log *zap.Logger) Func {
return func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reg := trace.StartRegion(r.Context(), "handler:ffsid")
ctx := r.Context() ctx := r.Context()
bd, err := GetBoxData(ctx) bd, err := GetBoxData(ctx)
if err != nil || bd.Gate.BearerToken == nil { if err != nil || bd.Gate.BearerToken == nil {
reqLogOrDefault(ctx, log).Debug(logs.AnonRequestSkipFrostfsIDValidation) reqLogOrDefault(ctx, log).Debug(logs.AnonRequestSkipFrostfsIDValidation)
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
reg.End()
return return
} }
@ -100,9 +108,11 @@ func FrostfsIDValidation(frostfsID FrostFSIDValidator, log *zap.Logger) Func {
if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil { if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil {
reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr)) reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
} }
reg.End()
return return
} }
reg.End()
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
}) })
} }

View file

@ -8,6 +8,7 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"runtime/trace"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
@ -83,6 +84,7 @@ type PolicyConfig struct {
func PolicyCheck(cfg PolicyConfig) Func { func PolicyCheck(cfg PolicyConfig) Func {
return func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reg := trace.StartRegion(r.Context(), "handler:policy")
ctx := r.Context() ctx := r.Context()
if err := policyCheck(r, cfg); err != nil { if err := policyCheck(r, cfg); err != nil {
reqLogOrDefault(ctx, cfg.Log).Error(logs.PolicyValidationFailed, zap.Error(err)) reqLogOrDefault(ctx, cfg.Log).Error(logs.PolicyValidationFailed, zap.Error(err))
@ -90,9 +92,11 @@ func PolicyCheck(cfg PolicyConfig) Func {
if _, wrErr := WriteErrorResponse(w, GetReqInfo(ctx), err); wrErr != nil { if _, wrErr := WriteErrorResponse(w, GetReqInfo(ctx), err); wrErr != nil {
reqLogOrDefault(ctx, cfg.Log).Error(logs.FailedToWriteResponse, zap.Error(wrErr)) reqLogOrDefault(ctx, cfg.Log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
} }
reg.End()
return return
} }
reg.End()
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
}) })
} }

View file

@ -3,6 +3,7 @@ package middleware
import ( import (
"context" "context"
"net/http" "net/http"
gotrace "runtime/trace"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -16,6 +17,12 @@ import (
func Tracing() Func { func Tracing() Func {
return func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if gotrace.IsEnabled() {
pctx, task := gotrace.NewTask(r.Context(), "request")
defer task.End()
r = r.WithContext(pctx)
}
appCtx, span := StartHTTPServerSpan(r, "REQUEST S3") appCtx, span := StartHTTPServerSpan(r, "REQUEST S3")
reqInfo := GetReqInfo(r.Context()) reqInfo := GetReqInfo(r.Context())
reqInfo.TraceID = span.SpanContext().TraceID().String() reqInfo.TraceID = span.SpanContext().TraceID().String()

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"runtime/trace"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
@ -154,6 +155,14 @@ func NewRouter(cfg Config) *chi.Mux {
})) }))
defaultRouter := chi.NewRouter() defaultRouter := chi.NewRouter()
defaultRouter.Use(func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reg := trace.StartRegion(r.Context(), "handler:s3")
defer reg.End()
handler.ServeHTTP(w, r)
})
})
defaultRouter.Mount(fmt.Sprintf("/{%s}", s3middleware.BucketURLPrm), bucketRouter(cfg.Handler, cfg.Log)) defaultRouter.Mount(fmt.Sprintf("/{%s}", s3middleware.BucketURLPrm), bucketRouter(cfg.Handler, cfg.Log))
defaultRouter.Get("/", named("ListBuckets", cfg.Handler.ListBucketsHandler)) defaultRouter.Get("/", named("ListBuckets", cfg.Handler.ListBucketsHandler))
attachErrorHandler(defaultRouter) attachErrorHandler(defaultRouter)

View file

@ -443,6 +443,7 @@ func (a *App) initMetrics() {
cfg := metrics.AppMetricsConfig{ cfg := metrics.AppMetricsConfig{
Logger: a.log, Logger: a.log,
PoolStatistics: frostfs.NewPoolStatistic(a.pool), PoolStatistics: frostfs.NewPoolStatistic(a.pool),
TreeStatistic: a.treePool,
Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Enabled: a.cfg.GetBool(cfgPrometheusEnabled),
} }

6
go.mod
View file

@ -3,10 +3,10 @@ module git.frostfs.info/TrueCloudLab/frostfs-s3-gw
go 1.21 go 1.21
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/aws/aws-sdk-go v1.44.6 github.com/aws/aws-sdk-go v1.44.6
@ -36,6 +36,8 @@ require (
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
) )
replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c => git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240731133113-04024af80f29
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect

8
go.sum
View file

@ -36,16 +36,14 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164 h1:XxvwQKJT/f16qS3df5PBQPRYKkhy0/A7zH6644QpKD0= git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e h1:gEWT+70E/RvGkxtSv+PlyUN2vtJVymhQa1mypvrXukM=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o= git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4= git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc= git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36 h1:MV/vKJWLQT34RRbXYvkNKFYGNjL5bRNuCQMXkbC7fLI=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo=
@ -56,6 +54,8 @@ git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjq
git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3GYvaX1a8GQZQHvlF8= git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3GYvaX1a8GQZQHvlF8=
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 h1:HeY8n27VyPRQe49l/fzyVMkWEB2fsLJYKp64pwA7tz4= git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 h1:HeY8n27VyPRQe49l/fzyVMkWEB2fsLJYKp64pwA7tz4=
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02/go.mod h1:rQFJJdEOV7KbbMtQYR2lNfiZk+ONRDJSbMCTWxKt8Fw= git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02/go.mod h1:rQFJJdEOV7KbbMtQYR2lNfiZk+ONRDJSbMCTWxKt8Fw=
git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240731133113-04024af80f29 h1:sYkxbvaKd9Q9flezuxy21Tr3Of+4Y+mtXFk6+JxFRHw=
git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240731133113-04024af80f29/go.mod h1:DlJmgV4/qkFkx2ab+YWznlMijiF2yZHnrJswJOB7XGs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"runtime/trace"
"strconv" "strconv"
"time" "time"
@ -53,6 +54,9 @@ func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
// TimeToEpoch implements frostfs.FrostFS interface method. // TimeToEpoch implements frostfs.FrostFS interface method.
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) { func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
reg := trace.StartRegion(ctx, "ffs:TimeToEpoch")
defer reg.End()
dur := futureTime.Sub(now) dur := futureTime.Sub(now)
if dur < 0 { if dur < 0 {
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)", return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
@ -89,6 +93,9 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
// Container implements frostfs.FrostFS interface method. // Container implements frostfs.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) { func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
reg := trace.StartRegion(ctx, "ffs:Container")
defer reg.End()
prm := pool.PrmContainerGet{ prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID, ContainerID: layerPrm.ContainerID,
Session: layerPrm.SessionToken, Session: layerPrm.SessionToken,
@ -104,6 +111,9 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*
// CreateContainer implements frostfs.FrostFS interface method. // CreateContainer implements frostfs.FrostFS interface method.
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) { func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
reg := trace.StartRegion(ctx, "ffs:CreateContainer")
defer reg.End()
var cnr container.Container var cnr container.Container
cnr.Init() cnr.Init()
cnr.SetPlacementPolicy(prm.Policy) cnr.SetPlacementPolicy(prm.Policy)
@ -152,6 +162,9 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
// UserContainers implements frostfs.FrostFS interface method. // UserContainers implements frostfs.FrostFS interface method.
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) { func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
reg := trace.StartRegion(ctx, "ffs:UserContainers")
defer reg.End()
prm := pool.PrmContainerList{ prm := pool.PrmContainerList{
OwnerID: layerPrm.UserID, OwnerID: layerPrm.UserID,
Session: layerPrm.SessionToken, Session: layerPrm.SessionToken,
@ -163,6 +176,9 @@ func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserCont
// DeleteContainer implements frostfs.FrostFS interface method. // DeleteContainer implements frostfs.FrostFS interface method.
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error { func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
reg := trace.StartRegion(ctx, "ffs:DeleteContainer")
defer reg.End()
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await} prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
err := x.pool.DeleteContainer(ctx, prm) err := x.pool.DeleteContainer(ctx, prm)
@ -171,6 +187,9 @@ func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session
// CreateObject implements frostfs.FrostFS interface method. // CreateObject implements frostfs.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) { func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) {
reg := trace.StartRegion(ctx, "ffs:CreateObject")
defer reg.End()
attrNum := len(prm.Attributes) + 1 // + creation time attrNum := len(prm.Attributes) + 1 // + creation time
if prm.Filepath != "" { if prm.Filepath != "" {
@ -237,8 +256,12 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
prmPut.UseKey(prm.PrivateKey) prmPut.UseKey(prm.PrivateKey)
} }
idObj, err := x.pool.PutObject(ctx, prmPut) res, err := x.pool.PutObject(ctx, prmPut)
return idObj, handleObjectError("save object via connection pool", err) if err = handleObjectError("save object via connection pool", err); err != nil {
return oid.ID{}, err
}
return res.ObjectID, nil
} }
// wraps io.ReadCloser and transforms Read errors related to access violation // wraps io.ReadCloser and transforms Read errors related to access violation
@ -257,6 +280,9 @@ func (x payloadReader) Read(p []byte) (int, error) {
// ReadObject implements frostfs.FrostFS interface method. // ReadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) { func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
reg := trace.StartRegion(ctx, "ffs:ReadObject")
defer reg.End()
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
addr.SetObject(prm.Object) addr.SetObject(prm.Object)
@ -342,6 +368,9 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
// DeleteObject implements frostfs.FrostFS interface method. // DeleteObject implements frostfs.FrostFS interface method.
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error { func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
reg := trace.StartRegion(ctx, "ffs:DeleteObject")
defer reg.End()
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
addr.SetObject(prm.Object) addr.SetObject(prm.Object)
@ -361,6 +390,9 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e
// SearchObjects implements frostfs.FrostFS interface method. // SearchObjects implements frostfs.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) { func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
reg := trace.StartRegion(ctx, "ffs:SearchObject")
defer reg.End()
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"runtime/trace"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
@ -78,6 +79,9 @@ func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
} }
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) { func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
reg := trace.StartRegion(ctx, "tree:GetNodes")
defer reg.End()
poolPrm := treepool.GetNodesParams{ poolPrm := treepool.GetNodesParams{
CID: prm.BktInfo.CID, CID: prm.BktInfo.CID,
TreeID: prm.TreeID, TreeID: prm.TreeID,
@ -103,6 +107,9 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
} }
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]tree.NodeResponse, error) { func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]tree.NodeResponse, error) {
reg := trace.StartRegion(ctx, "tree:GetSubTree")
defer reg.End()
poolPrm := treepool.GetSubTreeParams{ poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,
@ -177,6 +184,9 @@ func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
} }
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (tree.SubTreeStream, error) { func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (tree.SubTreeStream, error) {
reg := trace.StartRegion(ctx, "tree:GetSubTreeStream")
defer reg.End()
poolPrm := treepool.GetSubTreeParams{ poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,
@ -206,6 +216,9 @@ func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.Bucket
} }
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) { func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
reg := trace.StartRegion(ctx, "tree:AddNode")
defer reg.End()
nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{ nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,
@ -217,6 +230,9 @@ func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, tre
} }
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) { func (w *PoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
reg := trace.StartRegion(ctx, "tree:AddNodeByPath")
defer reg.End()
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{ nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,
@ -229,6 +245,9 @@ func (w *PoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInf
} }
func (w *PoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error { func (w *PoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
reg := trace.StartRegion(ctx, "tree:MoveNode")
defer reg.End()
return handleError(w.p.MoveNode(ctx, treepool.MoveNodeParams{ return handleError(w.p.MoveNode(ctx, treepool.MoveNodeParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,
@ -240,6 +259,9 @@ func (w *PoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, tr
} }
func (w *PoolWrapper) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { func (w *PoolWrapper) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
reg := trace.StartRegion(ctx, "tree:RemoveNode")
defer reg.End()
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{ return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,

View file

@ -142,11 +142,13 @@ const (
CouldntCacheSubject = "couldn't cache subject info" CouldntCacheSubject = "couldn't cache subject info"
UserGroupsListIsEmpty = "user groups list is empty, subject not found" UserGroupsListIsEmpty = "user groups list is empty, subject not found"
CouldntCacheUserKey = "couldn't cache user key" CouldntCacheUserKey = "couldn't cache user key"
FoundSeveralBucketCorsNodes = "found several bucket cors nodes, latest be used" ObjectTaggingNodeHasMultipleIDs = "object tagging node has multiple ids"
FoundSeveralObjectTaggingNodes = "found several object tagging nodes, latest be used" BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids"
FoundSeveralBucketTaggingNodes = "found several bucket tagging nodes, latest be used" BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
FoundSeveralBucketSettingsNodes = "found several bucket settings nodes, latest be used" BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
SystemNodeHasMultipleIDs = "system node has multiple ids"
FailedToRemoveOldSystemNode = "failed to remove old system node"
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts" UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
FoundSeveralSystemNodes = "found several system nodes, latest be used" FoundSeveralSystemNodes = "found several system nodes"
FailedToParsePartInfo = "failed to parse part info" FailedToParsePartInfo = "failed to parse part info"
) )

View file

@ -20,6 +20,7 @@ type AppMetrics struct {
type AppMetricsConfig struct { type AppMetricsConfig struct {
Logger *zap.Logger Logger *zap.Logger
PoolStatistics StatisticScraper PoolStatistics StatisticScraper
TreeStatistic TreePoolStatistic
Registerer prometheus.Registerer Registerer prometheus.Registerer
Enabled bool Enabled bool
} }
@ -36,7 +37,7 @@ func NewAppMetrics(cfg AppMetricsConfig) *AppMetrics {
return &AppMetrics{ return &AppMetrics{
logger: cfg.Logger, logger: cfg.Logger,
gate: NewGateMetrics(cfg.PoolStatistics, registry), gate: NewGateMetrics(cfg.PoolStatistics, cfg.TreeStatistic, registry),
enabled: cfg.Enabled, enabled: cfg.Enabled,
} }
} }

View file

@ -48,6 +48,14 @@ var appMetricsDesc = map[string]map[string]Description{
Help: "Average request duration (in milliseconds) for specific method on node in pool", Help: "Average request duration (in milliseconds) for specific method on node in pool",
VariableLabels: []string{"node", "method"}, VariableLabels: []string{"node", "method"},
}, },
interAvgRequestDurationMetric: Description{
Type: dto.MetricType_GAUGE,
Namespace: namespace,
Subsystem: poolSubsystem,
Name: interAvgRequestDurationMetric,
Help: "Intermediate average request duration (in milliseconds) for specific method on node in pool",
VariableLabels: []string{"node", "method"},
},
currentNodesMetric: Description{ currentNodesMetric: Description{
Type: dto.MetricType_GAUGE, Type: dto.MetricType_GAUGE,
Namespace: namespace, Namespace: namespace,
@ -144,6 +152,24 @@ var appMetricsDesc = map[string]map[string]Description{
VariableLabels: []string{"endpoint"}, VariableLabels: []string{"endpoint"},
}, },
}, },
treePoolSubsystem: {
avgRequestDurationMetric: Description{
Type: dto.MetricType_GAUGE,
Namespace: namespace,
Subsystem: treePoolSubsystem,
Name: avgRequestDurationMetric,
Help: "Average request duration (in milliseconds) for specific method in tree pool",
VariableLabels: []string{"method"},
},
interAvgRequestDurationMetric: Description{
Type: dto.MetricType_GAUGE,
Namespace: namespace,
Subsystem: treePoolSubsystem,
Name: interAvgRequestDurationMetric,
Help: "Intermediate average request duration (in milliseconds) for specific method in tree pool",
VariableLabels: []string{"method"},
},
},
} }
type Description struct { type Description struct {

View file

@ -4,6 +4,7 @@ import (
"net/http" "net/http"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
@ -15,6 +16,10 @@ type StatisticScraper interface {
Statistic() pool.Statistic Statistic() pool.Statistic
} }
type TreePoolStatistic interface {
Statistic() tree.Statistic
}
type GateMetrics struct { type GateMetrics struct {
registry prometheus.Registerer registry prometheus.Registerer
State *StateMetrics State *StateMetrics
@ -22,9 +27,10 @@ type GateMetrics struct {
Billing *billingMetrics Billing *billingMetrics
Stats *APIStatMetrics Stats *APIStatMetrics
HTTPServer *httpServerMetrics HTTPServer *httpServerMetrics
TreePool *treePoolMetricsCollector
} }
func NewGateMetrics(scraper StatisticScraper, registry prometheus.Registerer) *GateMetrics { func NewGateMetrics(scraper StatisticScraper, treeScraper TreePoolStatistic, registry prometheus.Registerer) *GateMetrics {
stateMetric := newStateMetrics() stateMetric := newStateMetrics()
registry.MustRegister(stateMetric) registry.MustRegister(stateMetric)
@ -40,6 +46,9 @@ func NewGateMetrics(scraper StatisticScraper, registry prometheus.Registerer) *G
serverMetric := newHTTPServerMetrics() serverMetric := newHTTPServerMetrics()
registry.MustRegister(serverMetric) registry.MustRegister(serverMetric)
treePoolMetric := newTreePoolMetricsCollector(treeScraper)
registry.MustRegister(treePoolMetric)
return &GateMetrics{ return &GateMetrics{
registry: registry, registry: registry,
State: stateMetric, State: stateMetric,
@ -47,6 +56,7 @@ func NewGateMetrics(scraper StatisticScraper, registry prometheus.Registerer) *G
Billing: billingMetric, Billing: billingMetric,
Stats: statsMetric, Stats: statsMetric,
HTTPServer: serverMetric, HTTPServer: serverMetric,
TreePool: treePoolMetric,
} }
} }

View file

@ -10,12 +10,13 @@ const (
) )
const ( const (
overallErrorsMetric = "overall_errors" overallErrorsMetric = "overall_errors"
overallNodeErrorsMetric = "overall_node_errors" overallNodeErrorsMetric = "overall_node_errors"
overallNodeRequestsMetric = "overall_node_requests" overallNodeRequestsMetric = "overall_node_requests"
currentErrorMetric = "current_errors" currentErrorMetric = "current_errors"
avgRequestDurationMetric = "avg_request_duration" avgRequestDurationMetric = "avg_request_duration"
currentNodesMetric = "current_nodes" interAvgRequestDurationMetric = "inter_avg_request_duration"
currentNodesMetric = "current_nodes"
) )
const ( const (
@ -37,24 +38,26 @@ const (
) )
type poolMetricsCollector struct { type poolMetricsCollector struct {
poolStatScraper StatisticScraper poolStatScraper StatisticScraper
overallErrors prometheus.Gauge overallErrors prometheus.Gauge
overallNodeErrors *prometheus.GaugeVec overallNodeErrors *prometheus.GaugeVec
overallNodeRequests *prometheus.GaugeVec overallNodeRequests *prometheus.GaugeVec
currentErrors *prometheus.GaugeVec currentErrors *prometheus.GaugeVec
requestDuration *prometheus.GaugeVec requestDuration *prometheus.GaugeVec
currentNodes *prometheus.GaugeVec interRequestDuration *prometheus.GaugeVec
currentNodes *prometheus.GaugeVec
} }
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector { func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
return &poolMetricsCollector{ return &poolMetricsCollector{
poolStatScraper: scraper, poolStatScraper: scraper,
overallErrors: mustNewGauge(appMetricsDesc[poolSubsystem][overallErrorsMetric]), overallErrors: mustNewGauge(appMetricsDesc[poolSubsystem][overallErrorsMetric]),
overallNodeErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeErrorsMetric]), overallNodeErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeErrorsMetric]),
overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]), overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]), currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]),
requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]), requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
currentNodes: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentNodesMetric]), interRequestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][interAvgRequestDurationMetric]),
currentNodes: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentNodesMetric]),
} }
} }
@ -65,6 +68,7 @@ func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m.overallNodeRequests.Collect(ch) m.overallNodeRequests.Collect(ch)
m.currentErrors.Collect(ch) m.currentErrors.Collect(ch)
m.requestDuration.Collect(ch) m.requestDuration.Collect(ch)
m.interRequestDuration.Collect(ch)
m.currentNodes.Collect(ch) m.currentNodes.Collect(ch)
} }
@ -74,6 +78,7 @@ func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
m.overallNodeRequests.Describe(descs) m.overallNodeRequests.Describe(descs)
m.currentErrors.Describe(descs) m.currentErrors.Describe(descs)
m.requestDuration.Describe(descs) m.requestDuration.Describe(descs)
m.interRequestDuration.Describe(descs)
m.currentNodes.Describe(descs) m.currentNodes.Describe(descs)
} }
@ -84,6 +89,7 @@ func (m *poolMetricsCollector) updateStatistic() {
m.overallNodeRequests.Reset() m.overallNodeRequests.Reset()
m.currentErrors.Reset() m.currentErrors.Reset()
m.requestDuration.Reset() m.requestDuration.Reset()
m.interRequestDuration.Reset()
m.currentNodes.Reset() m.currentNodes.Reset()
for _, node := range stat.Nodes() { for _, node := range stat.Nodes() {
@ -117,4 +123,20 @@ func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) {
m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds())) m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds())) m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds()))
m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds())) m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.InterAverageGetBalance().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.InterAveragePutContainer().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.InterAverageGetContainer().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.InterAverageListContainer().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.InterAverageDeleteContainer().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.InterAverageGetContainerEACL().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.InterAverageSetContainerEACL().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.InterAverageEndpointInfo().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.InterAverageNetworkInfo().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.InterAveragePutObject().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.InterAverageDeleteObject().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.InterAverageGetObject().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.InterAverageHeadObject().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.InterAverageRangeObject().Milliseconds()))
m.interRequestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.InterAverageCreateSession().Milliseconds()))
} }

62
metrics/treepool.go Normal file
View file

@ -0,0 +1,62 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
const (
treePoolSubsystem = "tree_pool"
methodGetNodes = "get_nodes"
methodGetSubTree = "get_sub_tree"
methodAddNode = "add_node"
methodAddNodeByPath = "add_node_by_path"
methodMoveNode = "move_node"
methodRemoveNode = "remove_node"
)
type treePoolMetricsCollector struct {
statScraper TreePoolStatistic
requestDuration *prometheus.GaugeVec
interRequestDuration *prometheus.GaugeVec
}
func newTreePoolMetricsCollector(stat TreePoolStatistic) *treePoolMetricsCollector {
return &treePoolMetricsCollector{
statScraper: stat,
requestDuration: mustNewGaugeVec(appMetricsDesc[treePoolSubsystem][avgRequestDurationMetric]),
interRequestDuration: mustNewGaugeVec(appMetricsDesc[treePoolSubsystem][interAvgRequestDurationMetric]),
}
}
func (m *treePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m.updateStatistic()
m.requestDuration.Collect(ch)
m.interRequestDuration.Collect(ch)
}
func (m *treePoolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
m.requestDuration.Describe(descs)
m.interRequestDuration.Describe(descs)
}
func (m *treePoolMetricsCollector) updateStatistic() {
stat := m.statScraper.Statistic()
m.requestDuration.Reset()
m.interRequestDuration.Reset()
m.requestDuration.WithLabelValues(methodGetNodes).Set(float64(stat.AverageGetNodes().Milliseconds()))
m.requestDuration.WithLabelValues(methodGetSubTree).Set(float64(stat.AverageGetSubTree().Milliseconds()))
m.requestDuration.WithLabelValues(methodAddNode).Set(float64(stat.AverageAddNode().Milliseconds()))
m.requestDuration.WithLabelValues(methodAddNodeByPath).Set(float64(stat.AverageAddNodeByPath().Milliseconds()))
m.requestDuration.WithLabelValues(methodMoveNode).Set(float64(stat.AverageMoveNode().Milliseconds()))
m.requestDuration.WithLabelValues(methodRemoveNode).Set(float64(stat.AverageRemoveNode().Milliseconds()))
m.interRequestDuration.WithLabelValues(methodGetNodes).Set(float64(stat.InterAverageGetNodes().Milliseconds()))
m.interRequestDuration.WithLabelValues(methodGetSubTree).Set(float64(stat.InterAverageGetSubTree().Milliseconds()))
m.interRequestDuration.WithLabelValues(methodAddNode).Set(float64(stat.InterAverageAddNode().Milliseconds()))
m.interRequestDuration.WithLabelValues(methodAddNodeByPath).Set(float64(stat.InterAverageAddNodeByPath().Milliseconds()))
m.interRequestDuration.WithLabelValues(methodMoveNode).Set(float64(stat.InterAverageMoveNode().Milliseconds()))
m.interRequestDuration.WithLabelValues(methodRemoveNode).Set(float64(stat.InterAverageRemoveNode().Milliseconds()))
}

View file

@ -53,6 +53,11 @@ type (
Meta map[string]string Meta map[string]string
} }
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct { GetNodesParams struct {
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
TreeID string TreeID string
@ -268,6 +273,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
return version, nil return version, nil
} }
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) { func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV) uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" { if uploadID == "" {
@ -394,11 +438,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
} }
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
node := multiNode.Latest()
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned} settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok { if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue settings.Versioning = versioningValue
@ -422,7 +468,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
} }
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -435,28 +481,35 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
return err return err
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
} }
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return fmt.Errorf("move settings node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
} }
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
} }
return node.ObjID, nil return node.Latest().ObjID, nil
} }
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) { func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
meta := make(map[string]string) meta := make(map[string]string)
@ -465,35 +518,64 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objI
if isErrNotFound { if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.ID{}, err return nil, err
} }
return oid.ID{}, layer.ErrNoNodeToRemove return nil, layer.ErrNoNodeToRemove
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
} }
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move cors node: %w", err)
}
objToDelete := make([]oid.ID, 1, len(multiNode.nodes))
objToDelete[0] = latest.ObjID
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
} }
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
return oid.ID{}, err if err != nil && !isErrNotFound {
return nil, err
} }
if node != nil { if isErrNotFound {
return nil, layer.ErrNoNodeToRemove
}
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if len(objToDelete) != len(multiNode.nodes) {
return nil, fmt.Errorf("clean old cors nodes: %w", err)
}
return objToDelete, nil
}
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.ID {
res := make([]oid.ID, 0, len(nodes))
for _, node := range nodes {
ind := node.GetLatestNodeIndex() ind := node.GetLatestNodeIndex()
if node.IsSplit() { if node.IsSplit() {
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
} else {
res = append(res, node.ObjID)
} }
return node.ObjID, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind])
} }
return oid.ID{}, layer.ErrNoNodeToRemove return res
} }
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
@ -541,7 +623,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
ind := tagNode.GetLatestNodeIndex() ind := tagNode.GetLatestNodeIndex()
if tagNode.IsSplit() { if tagNode.IsSplit() {
c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes) c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
} }
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet) return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
@ -552,14 +634,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo
} }
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tags := make(map[string]string) tags := make(map[string]string)
for key, val := range node.Meta { for key, val := range multiNode.Latest().Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) { if strings.HasPrefix(key, userDefinedTagPrefix) {
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
} }
@ -569,7 +651,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
} }
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -587,12 +669,19 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
return err return err
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
} }
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
return fmt.Errorf("move bucket tagging node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
} }
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error { func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
@ -615,6 +704,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI
return nil, err return nil, err
} }
// consider using map[string][]*treeNode
// to be able to remove unused node, that can be added during split
treeNodes := make(map[string]*treeNode, len(keys)) treeNodes := make(map[string]*treeNode, len(keys))
for _, s := range subtree { for _, s := range subtree {
@ -689,26 +780,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
return nodes[targetIndexNode], nil return nodes[targetIndexNode], nil
} }
func getLatestNode(nodes []NodeResponse) NodeResponse {
if len(nodes) == 0 {
return nil
}
var (
index int
maxTimestamp uint64
)
for i, node := range nodes {
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
return nodes[index]
}
func getMaxTimestamp(node NodeResponse) uint64 { func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64 var maxTimestamp uint64
@ -1558,11 +1629,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
return info.Meta return info.Meta
} }
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) { func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{ p := &GetNodesParams{
BktInfo: bktInfo, BktInfo: bktInfo,
TreeID: systemTree, TreeID: systemTree,
Path: path, Path: []string{name},
LatestOnly: false, LatestOnly: false,
AllAttrs: true, AllAttrs: true,
} }
@ -1577,10 +1648,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
if len(nodes) != 1 { if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/"))) c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
} }
return newTreeNode(getLatestNode(nodes)) return newMultiNode(nodes)
} }
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse { func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {