frostfs-node/internal/qos/grpc_test.go
Ekaterina Lebedeva c16788f9c6 [#1656] qos: Add tests for MaxActiveRPCLimiter Interceptors
Change-Id: Ib65890ae5aec34c34e15d4ec1f05952f74f1ad26
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-26 09:24:19 +03:00

119 lines
3.3 KiB
Go

package qos_test
import (
"context"
"errors"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
const (
okKey = "ok"
)
var (
errTest = errors.New("mock")
errResExhausted = new(apistatus.ResourceExhausted)
)
type mockGRPCServerStream struct {
grpc.ServerStream
ctx context.Context
}
func (m *mockGRPCServerStream) Context() context.Context {
return m.ctx
}
type limiter struct {
released bool
}
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
if key != okKey {
return nil, false
}
return func() { l.released = true }, true
}
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
called := false
handler := func(ctx context.Context, req any) (any, error) {
called = true
return nil, errTest
}
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
return called, err
}
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
called := false
handler := func(srv any, stream grpc.ServerStream) error {
called = true
return errTest
}
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
FullMethod: methodName,
}, handler)
return called, err
}
func Test_MaxActiveRPCLimiter(t *testing.T) {
// UnaryServerInterceptor
t.Run("unary fail", func(t *testing.T) {
var lim limiter
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
require.EqualError(t, err, errResExhausted.Error())
require.False(t, called)
})
t.Run("unary pass critical", func(t *testing.T) {
var lim limiter
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
called, err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
require.EqualError(t, err, errTest.Error())
require.True(t, called)
require.False(t, lim.released)
})
t.Run("unary pass", func(t *testing.T) {
var lim limiter
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
require.EqualError(t, err, errTest.Error())
require.True(t, called && lim.released)
})
// StreamServerInterceptor
t.Run("stream fail", func(t *testing.T) {
var lim limiter
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
require.EqualError(t, err, errResExhausted.Error())
require.False(t, called)
})
t.Run("stream pass critical", func(t *testing.T) {
var lim limiter
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
called, err := streamMaxActiveRPCLimiter(ctx, &lim, "")
require.EqualError(t, err, errTest.Error())
require.True(t, called)
require.False(t, lim.released)
})
t.Run("stream pass", func(t *testing.T) {
var lim limiter
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
require.EqualError(t, err, errTest.Error())
require.True(t, called && lim.released)
})
}