[#1639] qos: Add interceptors for limiting active RPCs

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2025-02-07 14:55:46 +03:00
parent 92b0059989
commit 1d4b0defa5
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
3 changed files with 86 additions and 0 deletions

3
go.mod
View file

@ -2,12 +2,15 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.22
replace git.frostfs.info/TrueCloudLab/frostfs-qos => ../qos
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-00010101000000-000000000000
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250206083857-7bdc78f2b5c2
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972

43
internal/qos/grpc.go Normal file
View file

@ -0,0 +1,43 @@
package qos
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"google.golang.org/grpc"
)
func NewMaxActiveRPCLimiterUnaryServerInterceptor(lr *limiting.Limiter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() {
return handler(ctx, req)
}
release, ok := lr.TryAcquire(info.FullMethod)
if !ok {
return nil, new(apistatus.ResourceExhausted)
}
defer release()
return handler(ctx, req)
}
}
//nolint:contextcheck
func NewMaxActiveRPCLimiterStreamServerInterceptor(lr *limiting.Limiter) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() {
return handler(srv, ss)
}
release, ok := lr.TryAcquire(info.FullMethod)
if !ok {
return new(apistatus.ResourceExhausted)
}
defer release()
return handler(srv, ss)
}
}

40
internal/qos/tags.go Normal file
View file

@ -0,0 +1,40 @@
// delete after #1608 merged
package qos
import "fmt"
type IOTag string
const (
IOTagClient IOTag = "client"
IOTagInternal IOTag = "internal"
IOTagBackground IOTag = "background"
IOTagWritecache IOTag = "writecache"
IOTagPolicer IOTag = "policer"
IOTagCritical IOTag = "critical"
ioTagUnknown IOTag = ""
)
func FromRawString(s string) (IOTag, error) {
switch s {
case string(IOTagCritical):
return IOTagCritical, nil
case string(IOTagClient):
return IOTagClient, nil
case string(IOTagInternal):
return IOTagInternal, nil
case string(IOTagBackground):
return IOTagBackground, nil
case string(IOTagWritecache):
return IOTagWritecache, nil
case string(IOTagPolicer):
return IOTagPolicer, nil
default:
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
}
}
func (t IOTag) String() string {
return string(t)
}