forked from TrueCloudLab/frostfs-node
Change-Id: I4a55fcb84e6f65408a1c0120ac917e49e23354a1 Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
132 lines
3.8 KiB
Go
132 lines
3.8 KiB
Go
package qos_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
okKey = "ok"
|
|
)
|
|
|
|
var (
|
|
errTest = errors.New("mock")
|
|
errWrongTag = errors.New("wrong tag")
|
|
errResExhausted = new(apistatus.ResourceExhausted)
|
|
)
|
|
|
|
type mockGRPCServerStream struct {
|
|
grpc.ServerStream
|
|
|
|
ctx context.Context
|
|
}
|
|
|
|
func (m *mockGRPCServerStream) Context() context.Context {
|
|
return m.ctx
|
|
}
|
|
|
|
type limiter struct {
|
|
released bool
|
|
}
|
|
|
|
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
|
|
if key != okKey {
|
|
return nil, false
|
|
}
|
|
return func() { l.released = true }, true
|
|
}
|
|
|
|
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
|
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
|
|
called := false
|
|
handler := func(ctx context.Context, req any) (any, error) {
|
|
called = true
|
|
return nil, errTest
|
|
}
|
|
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
|
|
return called, err
|
|
}
|
|
|
|
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
|
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
|
|
called := false
|
|
handler := func(srv any, stream grpc.ServerStream) error {
|
|
called = true
|
|
return errTest
|
|
}
|
|
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
|
|
FullMethod: methodName,
|
|
}, handler)
|
|
return called, err
|
|
}
|
|
|
|
func Test_MaxActiveRPCLimiter(t *testing.T) {
|
|
// UnaryServerInterceptor
|
|
t.Run("unary fail", func(t *testing.T) {
|
|
var lim limiter
|
|
|
|
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
|
|
require.EqualError(t, err, errResExhausted.Error())
|
|
require.False(t, called)
|
|
})
|
|
t.Run("unary pass critical", func(t *testing.T) {
|
|
var lim limiter
|
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
|
|
called, err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
|
|
require.EqualError(t, err, errTest.Error())
|
|
require.True(t, called)
|
|
require.False(t, lim.released)
|
|
})
|
|
t.Run("unary pass", func(t *testing.T) {
|
|
var lim limiter
|
|
|
|
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
|
require.EqualError(t, err, errTest.Error())
|
|
require.True(t, called && lim.released)
|
|
})
|
|
// StreamServerInterceptor
|
|
t.Run("stream fail", func(t *testing.T) {
|
|
var lim limiter
|
|
|
|
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
|
|
require.EqualError(t, err, errResExhausted.Error())
|
|
require.False(t, called)
|
|
})
|
|
t.Run("stream pass critical", func(t *testing.T) {
|
|
var lim limiter
|
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
|
|
called, err := streamMaxActiveRPCLimiter(ctx, &lim, "")
|
|
require.EqualError(t, err, errTest.Error())
|
|
require.True(t, called)
|
|
require.False(t, lim.released)
|
|
})
|
|
t.Run("stream pass", func(t *testing.T) {
|
|
var lim limiter
|
|
|
|
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
|
require.EqualError(t, err, errTest.Error())
|
|
require.True(t, called && lim.released)
|
|
})
|
|
}
|
|
|
|
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
|
|
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
|
|
handler := func(ctx context.Context, req any) (any, error) {
|
|
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
|
|
return nil, nil
|
|
}
|
|
return nil, errWrongTag
|
|
}
|
|
_, err := interceptor(context.Background(), nil, nil, handler)
|
|
require.NoError(t, err)
|
|
}
|