From 0671c277db2837fab94fdef2249b05f64caf2b73 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 31 Jan 2025 12:14:51 +0300 Subject: [PATCH] [#1608] tree: Add IO tag for tree sync requests Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/tree.go | 2 +- pkg/services/tree/qos.go | 101 +++++++++++++++++++++++++++++++++++ pkg/services/tree/service.go | 3 ++ 3 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 pkg/services/tree/qos.go diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go index f8330a25e..65414f0ca 100644 --- a/cmd/frostfs-node/tree.go +++ b/cmd/frostfs-node/tree.go @@ -72,7 +72,7 @@ func initTreeService(c *cfg) { ) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { - tree.RegisterTreeServiceServer(s, c.treeService) + tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService)) }) c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { diff --git a/pkg/services/tree/qos.go b/pkg/services/tree/qos.go new file mode 100644 index 000000000..8f21686df --- /dev/null +++ b/pkg/services/tree/qos.go @@ -0,0 +1,101 @@ +package tree + +import ( + "context" + + "google.golang.org/grpc" +) + +var _ TreeServiceServer = (*ioTagAdjust)(nil) + +type AdjustIOTag interface { + AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context +} + +type ioTagAdjust struct { + s TreeServiceServer + a AdjustIOTag +} + +func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer { + return &ioTagAdjust{ + s: s, + a: a, + } +} + +func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.Add(ctx, req) +} + +func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.AddByPath(ctx, req) +} + +func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.Apply(ctx, req) +} + +func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.GetNodeByPath(ctx, req) +} + +func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { + ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey()) + return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{ + sender: srv, + ServerStream: srv, + ctxF: func() context.Context { return ctx }, + }) +} + +func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { + ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey()) + return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{ + sender: srv, + ServerStream: srv, + ctxF: func() context.Context { return ctx }, + }) +} + +func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.Healthcheck(ctx, req) +} + +func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.Move(ctx, req) +} + +func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.Remove(ctx, req) +} + +func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { + ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey()) + return i.s.TreeList(ctx, req) +} + +type qosSend[T any] interface { + Send(T) error +} + +type qosServerWrapper[T any] struct { + grpc.ServerStream + sender qosSend[T] + ctxF func() context.Context +} + +func (w *qosServerWrapper[T]) Send(resp T) error { + return w.sender.Send(resp) +} + +func (w *qosServerWrapper[T]) Context() context.Context { + return w.ctxF() +} diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 3c0214a98..2e9722e79 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -9,9 +9,11 @@ import ( "sync" "sync/atomic" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" @@ -83,6 +85,7 @@ func New(opts ...Option) *Service { // Start starts the service. func (s *Service) Start(ctx context.Context) { + ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) go s.replicateLoop(ctx) go s.syncLoop(ctx)