From 04d8c16d3a5f87de033d12f52ac40fc62af22db7 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Fri, 7 Feb 2025 17:23:10 +0300 Subject: [PATCH] [#1639] qos: Add interceptors for limiting active RPCs Signed-off-by: Aleksey Savchuk --- internal/qos/grpc.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/internal/qos/grpc.go b/internal/qos/grpc.go index c253f1e9d..6134e24e5 100644 --- a/internal/qos/grpc.go +++ b/internal/qos/grpc.go @@ -3,7 +3,9 @@ package qos import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "google.golang.org/grpc" ) @@ -49,3 +51,36 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto return streamer(ctx, desc, cc, method, opts...) } } + +func NewMaxActiveRPCLimiterUnaryServerInterceptor(lr *limiting.Limiter) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() { + return handler(ctx, req) + } + + release, ok := lr.TryAcquire(info.FullMethod) + if !ok { + return nil, new(apistatus.ResourceExhausted) + } + defer release() + + return handler(ctx, req) + } +} + +//nolint:contextcheck +func NewMaxActiveRPCLimiterStreamServerInterceptor(lr *limiting.Limiter) grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() { + return handler(srv, ss) + } + + release, ok := lr.TryAcquire(info.FullMethod) + if !ok { + return new(apistatus.ResourceExhausted) + } + defer release() + + return handler(srv, ss) + } +}