diff --git a/internal/qos/grpc.go b/internal/qos/grpc.go index c253f1e9d..534a1f74b 100644 --- a/internal/qos/grpc.go +++ b/internal/qos/grpc.go @@ -3,7 +3,9 @@ package qos import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "google.golang.org/grpc" ) @@ -49,3 +51,36 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto return streamer(ctx, desc, cc, method, opts...) } } + +func NewMaxActiveRPCLimiterUnaryServerInterceptor(getLimiter func() limiting.Limiter) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() { + return handler(ctx, req) + } + + release, ok := getLimiter().Acquire(info.FullMethod) + if !ok { + return nil, new(apistatus.ResourceExhausted) + } + defer release() + + return handler(ctx, req) + } +} + +//nolint:contextcheck (grpc.ServerStream manages the context itself) +func NewMaxActiveRPCLimiterStreamServerInterceptor(getLimiter func() limiting.Limiter) grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() { + return handler(srv, ss) + } + + release, ok := getLimiter().Acquire(info.FullMethod) + if !ok { + return new(apistatus.ResourceExhausted) + } + defer release() + + return handler(srv, ss) + } +}