diff --git a/cmd/frostfs-node/control.go b/cmd/frostfs-node/control.go index 1d9ac3df0..1825013c7 100644 --- a/cmd/frostfs-node/control.go +++ b/cmd/frostfs-node/control.go @@ -13,7 +13,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" - "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -56,10 +55,7 @@ func initControlService(ctx context.Context, c *cfg) { c.cfgControlService.server = grpc.NewServer( grpc.ChainUnaryInterceptor( - func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { - ctx = tagging.ContextWithIOTag(ctx, qos.IOTagCritical.String()) - return handler(ctx, req) - }, + qos.NewSetCriticalIOTagUnaryServerInterceptor(), metrics.NewUnaryServerInterceptor(), tracing.NewUnaryServerInterceptor(), ), diff --git a/internal/qos/grpc.go b/internal/qos/grpc.go new file mode 100644 index 000000000..c253f1e9d --- /dev/null +++ b/internal/qos/grpc.go @@ -0,0 +1,51 @@ +package qos + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + "google.golang.org/grpc" +) + +func NewSetCriticalIOTagUnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + ctx = tagging.ContextWithIOTag(ctx, IOTagCritical.String()) + return handler(ctx, req) + } +} + +func NewAdjustOutgoingIOTagUnaryClientInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + rawTag, ok := tagging.IOTagFromContext(ctx) + if !ok { + return invoker(ctx, method, req, reply, cc, opts...) + } + tag, err := FromRawString(rawTag) + if err != nil { + tag = IOTagClient + } + if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache { + tag = IOTagInternal + } + ctx = tagging.ContextWithIOTag(ctx, tag.String()) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + rawTag, ok := tagging.IOTagFromContext(ctx) + if !ok { + return streamer(ctx, desc, cc, method, opts...) + } + tag, err := FromRawString(rawTag) + if err != nil { + tag = IOTagClient + } + if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache { + tag = IOTagInternal + } + ctx = tagging.ContextWithIOTag(ctx, tag.String()) + return streamer(ctx, desc, cc, method, opts...) + } +} diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 5c3beb553..e94fa580a 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -7,11 +7,12 @@ import ( "sync" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" - qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -63,14 +64,16 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address grpcOpts := []grpc.DialOption{ grpc.WithChainUnaryInterceptor( + qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(), metrics.NewUnaryClientInterceptor(), tracing.NewUnaryClientInteceptor(), - qos.NewUnaryClientInteceptor(), + tagging.NewUnaryClientInteceptor(), ), grpc.WithChainStreamInterceptor( + qos.NewAdjustOutgoingIOTagStreamClientInterceptor(), metrics.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(), - qos.NewStreamClientInterceptor(), + tagging.NewStreamClientInterceptor(), ), grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), diff --git a/pkg/services/tree/cache.go b/pkg/services/tree/cache.go index 7f1dcf07c..125871fc4 100644 --- a/pkg/services/tree/cache.go +++ b/pkg/services/tree/cache.go @@ -9,10 +9,11 @@ import ( "time" internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" - qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" "github.com/hashicorp/golang-lru/v2/simplelru" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" @@ -96,14 +97,16 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (* opts := []grpc.DialOption{ grpc.WithChainUnaryInterceptor( + qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(), metrics.NewUnaryClientInterceptor(), tracing.NewUnaryClientInteceptor(), - qos.NewUnaryClientInteceptor(), + tagging.NewUnaryClientInteceptor(), ), grpc.WithChainStreamInterceptor( + qos.NewAdjustOutgoingIOTagStreamClientInterceptor(), metrics.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(), - qos.NewStreamClientInterceptor(), + tagging.NewStreamClientInterceptor(), ), grpc.WithContextDialer(c.ds.GrpcContextDialer()), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index a0485d9d8..3e0a45385 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -13,6 +13,7 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" @@ -20,7 +21,7 @@ import ( metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" - qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/panjf2000/ants/v2" @@ -341,14 +342,16 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) { return grpc.NewClient(a.URIAddr(), grpc.WithChainUnaryInterceptor( + qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(), metrics.NewUnaryClientInterceptor(), tracing_grpc.NewUnaryClientInteceptor(), - qos.NewUnaryClientInteceptor(), + tagging.NewUnaryClientInteceptor(), ), grpc.WithChainStreamInterceptor( + qos.NewAdjustOutgoingIOTagStreamClientInterceptor(), metrics.NewStreamClientInterceptor(), tracing_grpc.NewStreamClientInterceptor(), - qos.NewStreamClientInterceptor(), + tagging.NewStreamClientInterceptor(), ), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),