From 1d4b0defa5afac2b63c67a90d352c761e0d8a446 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Fri, 7 Feb 2025 14:55:46 +0300 Subject: [PATCH] [#1639] qos: Add interceptors for limiting active RPCs Signed-off-by: Aleksey Savchuk --- go.mod | 3 +++ internal/qos/grpc.go | 43 +++++++++++++++++++++++++++++++++++++++++++ internal/qos/tags.go | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+) create mode 100644 internal/qos/grpc.go create mode 100644 internal/qos/tags.go diff --git a/go.mod b/go.mod index 8283207a7..734d4cd8e 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,15 @@ module git.frostfs.info/TrueCloudLab/frostfs-node go 1.22 +replace git.frostfs.info/TrueCloudLab/frostfs-qos => ../qos + require ( code.gitea.io/sdk/gitea v0.17.1 git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08 git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 + git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-00010101000000-000000000000 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250206083857-7bdc78f2b5c2 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 diff --git a/internal/qos/grpc.go b/internal/qos/grpc.go new file mode 100644 index 000000000..1aeaa20ff --- /dev/null +++ b/internal/qos/grpc.go @@ -0,0 +1,43 @@ +package qos + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "google.golang.org/grpc" +) + +func NewMaxActiveRPCLimiterUnaryServerInterceptor(lr *limiting.Limiter) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() { + return handler(ctx, req) + } + + release, ok := lr.TryAcquire(info.FullMethod) + if !ok { + return nil, new(apistatus.ResourceExhausted) + } + defer release() + + return handler(ctx, req) + } +} + +//nolint:contextcheck +func NewMaxActiveRPCLimiterStreamServerInterceptor(lr *limiting.Limiter) grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() { + return handler(srv, ss) + } + + release, ok := lr.TryAcquire(info.FullMethod) + if !ok { + return new(apistatus.ResourceExhausted) + } + defer release() + + return handler(srv, ss) + } +} diff --git a/internal/qos/tags.go b/internal/qos/tags.go new file mode 100644 index 000000000..6037bb093 --- /dev/null +++ b/internal/qos/tags.go @@ -0,0 +1,40 @@ +// delete after #1608 merged +package qos + +import "fmt" + +type IOTag string + +const ( + IOTagClient IOTag = "client" + IOTagInternal IOTag = "internal" + IOTagBackground IOTag = "background" + IOTagWritecache IOTag = "writecache" + IOTagPolicer IOTag = "policer" + IOTagCritical IOTag = "critical" + + ioTagUnknown IOTag = "" +) + +func FromRawString(s string) (IOTag, error) { + switch s { + case string(IOTagCritical): + return IOTagCritical, nil + case string(IOTagClient): + return IOTagClient, nil + case string(IOTagInternal): + return IOTagInternal, nil + case string(IOTagBackground): + return IOTagBackground, nil + case string(IOTagWritecache): + return IOTagWritecache, nil + case string(IOTagPolicer): + return IOTagPolicer, nil + default: + return ioTagUnknown, fmt.Errorf("unknown tag %s", s) + } +} + +func (t IOTag) String() string { + return string(t) +}