From af028748eabf39786a1d346af95e93f67a8089a4 Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Fri, 31 Jan 2025 12:14:51 +0300
Subject: [PATCH] [#1608] tree: Add IO tag for tree sync requests

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 cmd/frostfs-node/tree.go  |   2 +-
 pkg/services/tree/qos.go  | 101 ++++++++++++++++++++++++++++++++++++++
 pkg/services/tree/sync.go |   1 +
 3 files changed, 103 insertions(+), 1 deletion(-)
 create mode 100644 pkg/services/tree/qos.go

diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go
index f3ddc8cbe..42022d603 100644
--- a/cmd/frostfs-node/tree.go
+++ b/cmd/frostfs-node/tree.go
@@ -72,7 +72,7 @@ func initTreeService(c *cfg) {
 	)
 
 	c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
-		tree.RegisterTreeServiceServer(s, c.treeService)
+		tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService))
 	})
 
 	c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
diff --git a/pkg/services/tree/qos.go b/pkg/services/tree/qos.go
new file mode 100644
index 000000000..511c19ce4
--- /dev/null
+++ b/pkg/services/tree/qos.go
@@ -0,0 +1,101 @@
+package tree
+
+import (
+	"context"
+
+	"google.golang.org/grpc"
+)
+
+var _ TreeServiceServer = (*ioTagAdjust)(nil)
+
+type AdjustIOTag interface {
+	AdjustIncommingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
+}
+
+type ioTagAdjust struct {
+	s TreeServiceServer
+	a AdjustIOTag
+}
+
+func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer {
+	return &ioTagAdjust{
+		s: s,
+		a: a,
+	}
+}
+
+func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.Add(ctx, req)
+}
+
+func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.AddByPath(ctx, req)
+}
+
+func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.Apply(ctx, req)
+}
+
+func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.GetNodeByPath(ctx, req)
+}
+
+func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
+	ctx := i.a.AdjustIncommingTag(srv.Context(), req.GetSignature().GetKey())
+	return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{
+		sender:       srv,
+		ServerStream: srv,
+		ctxF:         func() context.Context { return ctx },
+	})
+}
+
+func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
+	ctx := i.a.AdjustIncommingTag(srv.Context(), req.GetSignature().GetKey())
+	return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{
+		sender:       srv,
+		ServerStream: srv,
+		ctxF:         func() context.Context { return ctx },
+	})
+}
+
+func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.Healthcheck(ctx, req)
+}
+
+func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.Move(ctx, req)
+}
+
+func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.Remove(ctx, req)
+}
+
+func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
+	ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
+	return i.s.TreeList(ctx, req)
+}
+
+type qosSend[T any] interface {
+	Send(T) error
+}
+
+type qosServerWrapper[T any] struct {
+	grpc.ServerStream
+	sender qosSend[T]
+	ctxF   func() context.Context
+}
+
+func (w *qosServerWrapper[T]) Send(resp T) error {
+	return w.sender.Send(resp)
+}
+
+func (w *qosServerWrapper[T]) Context() context.Context {
+	return w.ctxF()
+}
diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go
index fab85fbc4..169b7551c 100644
--- a/pkg/services/tree/sync.go
+++ b/pkg/services/tree/sync.go
@@ -387,6 +387,7 @@ func (s *Service) SynchronizeAll() error {
 }
 
 func (s *Service) syncLoop(ctx context.Context) {
+	ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
 	for {
 		select {
 		case <-s.closeCh: