forked from TrueCloudLab/frostfs-sdk-go
937 lines
24 KiB
Go
937 lines
24 KiB
Go
package pool
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
"go.uber.org/zap/zaptest/observer"
|
|
)
|
|
|
|
func TestBuildPoolClientFailed(t *testing.T) {
|
|
mockClientBuilder := func(addr string) client {
|
|
mockCli := newMockClient(addr, *newPrivateKey(t))
|
|
mockCli.errOnDial()
|
|
return mockCli
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{{1, "peer0", 1}},
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(context.Background())
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestBuildPoolCreateSessionFailed(t *testing.T) {
|
|
clientMockBuilder := func(addr string) client {
|
|
mockCli := newMockClient(addr, *newPrivateKey(t))
|
|
mockCli.errOnCreateSession()
|
|
return mockCli
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{{1, "peer0", 1}},
|
|
}
|
|
opts.setClientBuilder(clientMockBuilder)
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(context.Background())
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func newPrivateKey(t *testing.T) *ecdsa.PrivateKey {
|
|
p, err := keys.NewPrivateKey()
|
|
require.NoError(t, err)
|
|
return &p.PrivateKey
|
|
}
|
|
|
|
func TestBuildPoolOneNodeFailed(t *testing.T) {
|
|
nodes := []NodeParam{
|
|
{1, "peer0", 1},
|
|
{2, "peer1", 1},
|
|
}
|
|
|
|
var clientKeys []*ecdsa.PrivateKey
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
clientKeys = append(clientKeys, key)
|
|
|
|
if addr == nodes[0].address {
|
|
mockCli := newMockClient(addr, *key)
|
|
mockCli.errOnEndpointInfo()
|
|
return mockCli
|
|
}
|
|
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
log, err := zap.NewProduction()
|
|
require.NoError(t, err)
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
clientRebalanceInterval: 1000 * time.Millisecond,
|
|
logger: log,
|
|
nodeParams: nodes,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
clientPool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = clientPool.Dial(context.Background())
|
|
require.NoError(t, err)
|
|
t.Cleanup(clientPool.Close)
|
|
|
|
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
|
condition := func() bool {
|
|
cp, err := clientPool.connection()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
|
|
return st.AssertAuthKey(&expectedAuthKey)
|
|
}
|
|
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
|
require.Eventually(t, condition, 3*time.Second, 300*time.Millisecond)
|
|
}
|
|
|
|
func TestBuildPoolZeroNodes(t *testing.T) {
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
}
|
|
_, err := NewPool(opts)
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestOneNode(t *testing.T) {
|
|
key1 := newPrivateKey(t)
|
|
mockClientBuilder := func(addr string) client {
|
|
return newMockClient(addr, *key1)
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{{1, "peer0", 1}},
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(context.Background())
|
|
require.NoError(t, err)
|
|
t.Cleanup(pool.Close)
|
|
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
|
}
|
|
|
|
func TestTwoNodes(t *testing.T) {
|
|
var clientKeys []*ecdsa.PrivateKey
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
clientKeys = append(clientKeys, key)
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{
|
|
{1, "peer0", 1},
|
|
{1, "peer1", 1},
|
|
},
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(context.Background())
|
|
require.NoError(t, err)
|
|
t.Cleanup(pool.Close)
|
|
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
|
}
|
|
|
|
func assertAuthKeyForAny(st session.Object, clientKeys []*ecdsa.PrivateKey) bool {
|
|
for _, key := range clientKeys {
|
|
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
|
|
if st.AssertAuthKey(&expectedAuthKey) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func TestOneOfTwoFailed(t *testing.T) {
|
|
nodes := []NodeParam{
|
|
{1, "peer0", 1},
|
|
{9, "peer1", 1},
|
|
}
|
|
|
|
var clientKeys []*ecdsa.PrivateKey
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
clientKeys = append(clientKeys, key)
|
|
|
|
if addr == nodes[0].address {
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
mockCli := newMockClient(addr, *key)
|
|
mockCli.errOnEndpointInfo()
|
|
mockCli.errOnNetworkInfo()
|
|
return mockCli
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: nodes,
|
|
clientRebalanceInterval: 200 * time.Millisecond,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, err)
|
|
t.Cleanup(pool.Close)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
for range 5 {
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
|
}
|
|
}
|
|
|
|
func TestUpdateNodesHealth(t *testing.T) {
|
|
ctx := context.Background()
|
|
key := newPrivateKey(t)
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
wasHealthy bool
|
|
willHealthy bool
|
|
prepareCli func(*mockClient)
|
|
}{
|
|
{
|
|
name: "healthy, maintenance, unhealthy",
|
|
wasHealthy: true,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
|
|
},
|
|
{
|
|
name: "unhealthy, maintenance, unhealthy",
|
|
wasHealthy: false,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
|
|
},
|
|
{
|
|
name: "healthy, no error, healthy",
|
|
wasHealthy: true,
|
|
willHealthy: true,
|
|
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
|
|
},
|
|
{
|
|
name: "unhealthy, no error, healthy",
|
|
wasHealthy: false,
|
|
willHealthy: true,
|
|
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
|
|
},
|
|
{
|
|
name: "healthy, error, failed restart, unhealthy",
|
|
wasHealthy: true,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) {
|
|
c.errOnEndpointInfo()
|
|
c.errorOnDial = true
|
|
},
|
|
},
|
|
{
|
|
name: "unhealthy, error, failed restart, unhealthy",
|
|
wasHealthy: false,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) {
|
|
c.errOnEndpointInfo()
|
|
c.errorOnDial = true
|
|
},
|
|
},
|
|
{
|
|
name: "healthy, error, restart, error, unhealthy",
|
|
wasHealthy: true,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
|
|
},
|
|
{
|
|
name: "unhealthy, error, restart, error, unhealthy",
|
|
wasHealthy: false,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
|
|
},
|
|
{
|
|
name: "healthy, error, restart, maintenance, unhealthy",
|
|
wasHealthy: true,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) {
|
|
healthError := true
|
|
c.healthcheckFn = func() {
|
|
if healthError {
|
|
c.errorOnEndpointInfo = errors.New("error")
|
|
} else {
|
|
c.errorOnEndpointInfo = nil
|
|
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
|
|
}
|
|
healthError = !healthError
|
|
}
|
|
},
|
|
},
|
|
{
|
|
name: "unhealthy, error, restart, maintenance, unhealthy",
|
|
wasHealthy: false,
|
|
willHealthy: false,
|
|
prepareCli: func(c *mockClient) {
|
|
healthError := true
|
|
c.healthcheckFn = func() {
|
|
if healthError {
|
|
c.errorOnEndpointInfo = errors.New("error")
|
|
} else {
|
|
c.errorOnEndpointInfo = nil
|
|
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
|
|
}
|
|
healthError = !healthError
|
|
}
|
|
},
|
|
},
|
|
{
|
|
name: "healthy, error, restart, healthy",
|
|
wasHealthy: true,
|
|
willHealthy: true,
|
|
prepareCli: func(c *mockClient) {
|
|
healthError := true
|
|
c.healthcheckFn = func() {
|
|
if healthError {
|
|
c.errorOnEndpointInfo = errors.New("error")
|
|
} else {
|
|
c.errorOnEndpointInfo = nil
|
|
}
|
|
healthError = !healthError
|
|
}
|
|
},
|
|
},
|
|
{
|
|
name: "unhealthy, error, restart, healthy",
|
|
wasHealthy: false,
|
|
willHealthy: true,
|
|
prepareCli: func(c *mockClient) {
|
|
healthError := true
|
|
c.healthcheckFn = func() {
|
|
if healthError {
|
|
c.errorOnEndpointInfo = errors.New("error")
|
|
} else {
|
|
c.errorOnEndpointInfo = nil
|
|
}
|
|
healthError = !healthError
|
|
}
|
|
},
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
cli := newMockClientHealthy("peer0", *key, tc.wasHealthy)
|
|
tc.prepareCli(cli)
|
|
p, log := newPool(t, cli)
|
|
|
|
p.updateNodesHealth(ctx, [][]float64{{1}})
|
|
|
|
changed := tc.wasHealthy != tc.willHealthy
|
|
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
|
|
require.Equalf(t, changed, 1 == log.Len(), "healthy status should be changed: %v", changed)
|
|
})
|
|
}
|
|
}
|
|
|
|
func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
|
|
log, observedLog := getObservedLogger()
|
|
|
|
cache, err := newCache(0)
|
|
require.NoError(t, err)
|
|
|
|
return &Pool{
|
|
innerPools: []*innerPool{{
|
|
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
|
clients: []client{cli},
|
|
}},
|
|
cache: cache,
|
|
key: newPrivateKey(t),
|
|
closedCh: make(chan struct{}),
|
|
rebalanceParams: rebalanceParameters{
|
|
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
|
|
nodeRequestTimeout: time.Second,
|
|
clientRebalanceInterval: 200 * time.Millisecond,
|
|
},
|
|
logger: log,
|
|
}, observedLog
|
|
}
|
|
|
|
func getObservedLogger() (*zap.Logger, *observer.ObservedLogs) {
|
|
loggerCore, observedLog := observer.New(zap.DebugLevel)
|
|
return zap.New(loggerCore), observedLog
|
|
}
|
|
|
|
func TestTwoFailed(t *testing.T) {
|
|
var clientKeys []*ecdsa.PrivateKey
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
clientKeys = append(clientKeys, key)
|
|
mockCli := newMockClient(addr, *key)
|
|
mockCli.errOnEndpointInfo()
|
|
return mockCli
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{
|
|
{1, "peer0", 1},
|
|
{1, "peer1", 1},
|
|
},
|
|
clientRebalanceInterval: 200 * time.Millisecond,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
t.Cleanup(pool.Close)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
_, err = pool.connection()
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "no healthy")
|
|
}
|
|
|
|
func TestSessionCache(t *testing.T) {
|
|
key := newPrivateKey(t)
|
|
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
|
|
|
|
mockClientBuilder := func(addr string) client {
|
|
mockCli := newMockClient(addr, *key)
|
|
mockCli.statusOnGetObject(new(apistatus.SessionTokenNotFound))
|
|
return mockCli
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{
|
|
{1, "peer0", 1},
|
|
},
|
|
clientRebalanceInterval: 30 * time.Second,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(ctx)
|
|
require.NoError(t, err)
|
|
t.Cleanup(pool.Close)
|
|
|
|
// cache must contain session token
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
|
|
|
var prm PrmObjectGet
|
|
prm.SetAddress(oid.Address{})
|
|
prm.UseSession(session.Object{})
|
|
|
|
_, err = pool.GetObject(ctx, prm)
|
|
require.Error(t, err)
|
|
|
|
// cache must not contain session token
|
|
cp, err = pool.connection()
|
|
require.NoError(t, err)
|
|
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
require.False(t, ok)
|
|
|
|
var prm2 PrmObjectPut
|
|
prm2.SetHeader(object.Object{})
|
|
|
|
_, err = pool.PutObject(ctx, prm2)
|
|
require.NoError(t, err)
|
|
|
|
// cache must contain session token
|
|
cp, err = pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
|
}
|
|
|
|
func TestPriority(t *testing.T) {
|
|
nodes := []NodeParam{
|
|
{1, "peer0", 1},
|
|
{2, "peer1", 100},
|
|
}
|
|
|
|
var clientKeys []*ecdsa.PrivateKey
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
clientKeys = append(clientKeys, key)
|
|
|
|
if addr == nodes[0].address {
|
|
mockCli := newMockClient(addr, *key)
|
|
mockCli.errOnEndpointInfo()
|
|
return mockCli
|
|
}
|
|
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: nodes,
|
|
clientRebalanceInterval: 1500 * time.Millisecond,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(ctx)
|
|
require.NoError(t, err)
|
|
t.Cleanup(pool.Close)
|
|
|
|
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
|
firstNode := func() bool {
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
return st.AssertAuthKey(&expectedAuthKey1)
|
|
}
|
|
|
|
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
|
secondNode := func() bool {
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
return st.AssertAuthKey(&expectedAuthKey2)
|
|
}
|
|
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
|
|
|
require.Eventually(t, secondNode, time.Second, 200*time.Millisecond)
|
|
require.Never(t, firstNode, time.Second, 200*time.Millisecond)
|
|
}
|
|
|
|
func TestSessionCacheWithKey(t *testing.T) {
|
|
key := newPrivateKey(t)
|
|
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
|
|
|
|
mockClientBuilder := func(addr string) client {
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{
|
|
{1, "peer0", 1},
|
|
},
|
|
clientRebalanceInterval: 30 * time.Second,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// cache must contain session token
|
|
cp, err := pool.connection()
|
|
require.NoError(t, err)
|
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
|
|
|
var prm PrmObjectDelete
|
|
prm.SetAddress(oid.Address{})
|
|
anonKey := newPrivateKey(t)
|
|
prm.UseKey(anonKey)
|
|
|
|
err = pool.DeleteObject(ctx, prm)
|
|
require.NoError(t, err)
|
|
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
|
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
|
}
|
|
|
|
func TestSessionTokenOwner(t *testing.T) {
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: []NodeParam{
|
|
{1, "peer0", 1},
|
|
},
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
p, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = p.Dial(ctx)
|
|
require.NoError(t, err)
|
|
t.Cleanup(p.Close)
|
|
|
|
anonKey := newPrivateKey(t)
|
|
var anonOwner user.ID
|
|
user.IDFromKey(&anonOwner, anonKey.PublicKey)
|
|
|
|
var prm prmCommon
|
|
prm.UseKey(anonKey)
|
|
var prmCtx prmContext
|
|
prmCtx.useDefaultSession()
|
|
|
|
var tkn session.Object
|
|
var cc callContext
|
|
cc.sessionTarget = func(tok session.Object) {
|
|
tkn = tok
|
|
}
|
|
err = p.initCallContext(&cc, prm, prmCtx)
|
|
require.NoError(t, err)
|
|
|
|
err = p.openDefaultSession(ctx, &cc)
|
|
require.NoError(t, err)
|
|
require.True(t, tkn.VerifySignature())
|
|
require.True(t, tkn.Issuer().Equals(anonOwner))
|
|
}
|
|
|
|
func TestWaitPresence(t *testing.T) {
|
|
mockCli := newMockClient("", *newPrivateKey(t))
|
|
|
|
t.Run("context canceled", func(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go func() {
|
|
time.Sleep(500 * time.Millisecond)
|
|
cancel()
|
|
}()
|
|
|
|
err := waitForContainerPresence(ctx, mockCli, PrmContainerGet{}, &WaitParams{
|
|
Timeout: 120 * time.Second,
|
|
PollInterval: 5 * time.Second,
|
|
})
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "context canceled")
|
|
})
|
|
|
|
t.Run("context deadline exceeded", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
err := waitForContainerPresence(ctx, mockCli, PrmContainerGet{}, &WaitParams{
|
|
Timeout: 500 * time.Millisecond,
|
|
PollInterval: 5 * time.Second,
|
|
})
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "context deadline exceeded")
|
|
})
|
|
|
|
t.Run("ok", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
err := waitForContainerPresence(ctx, mockCli, PrmContainerGet{}, &WaitParams{
|
|
Timeout: 10 * time.Second,
|
|
PollInterval: 500 * time.Millisecond,
|
|
})
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestStatusMonitor(t *testing.T) {
|
|
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
|
|
monitor.errorThreshold = 3
|
|
|
|
count := 10
|
|
for range count {
|
|
monitor.incErrorRate()
|
|
}
|
|
|
|
require.Equal(t, uint64(count), monitor.overallErrorRate())
|
|
require.Equal(t, uint32(1), monitor.currentErrorRate())
|
|
|
|
t.Run("healthy status", func(t *testing.T) {
|
|
cases := []struct {
|
|
action func(*clientStatusMonitor)
|
|
status uint32
|
|
isDialed bool
|
|
isHealthy bool
|
|
description string
|
|
}{
|
|
{
|
|
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
|
|
status: statusUnhealthyOnRequest,
|
|
isDialed: true,
|
|
isHealthy: false,
|
|
description: "set unhealthy on request",
|
|
},
|
|
{
|
|
action: func(m *clientStatusMonitor) { m.setHealthy() },
|
|
status: statusHealthy,
|
|
isDialed: true,
|
|
isHealthy: true,
|
|
description: "set healthy",
|
|
},
|
|
}
|
|
for _, tc := range cases {
|
|
tc.action(&monitor)
|
|
require.Equal(t, tc.status, monitor.healthy.Load())
|
|
require.Equal(t, tc.isHealthy, monitor.isHealthy())
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestHandleError(t *testing.T) {
|
|
ctx := context.Background()
|
|
log := zaptest.NewLogger(t)
|
|
|
|
canceledCtx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
ctx context.Context
|
|
status apistatus.Status
|
|
err error
|
|
expectedError bool
|
|
countError bool
|
|
markedUnhealthy bool
|
|
}{
|
|
{
|
|
name: "no error, no status",
|
|
ctx: ctx,
|
|
status: nil,
|
|
err: nil,
|
|
expectedError: false,
|
|
countError: false,
|
|
},
|
|
{
|
|
name: "no error, success status",
|
|
ctx: ctx,
|
|
status: new(apistatus.SuccessDefaultV2),
|
|
err: nil,
|
|
expectedError: false,
|
|
countError: false,
|
|
},
|
|
{
|
|
name: "error, success status",
|
|
ctx: ctx,
|
|
status: new(apistatus.SuccessDefaultV2),
|
|
err: errors.New("error"),
|
|
expectedError: true,
|
|
countError: true,
|
|
},
|
|
{
|
|
name: "error, no status",
|
|
ctx: ctx,
|
|
status: nil,
|
|
err: errors.New("error"),
|
|
expectedError: true,
|
|
countError: true,
|
|
},
|
|
{
|
|
name: "no error, object not found status",
|
|
ctx: ctx,
|
|
status: new(apistatus.ObjectNotFound),
|
|
err: nil,
|
|
expectedError: true,
|
|
countError: false,
|
|
},
|
|
{
|
|
name: "object not found error, object not found status",
|
|
ctx: ctx,
|
|
status: new(apistatus.ObjectNotFound),
|
|
err: &apistatus.ObjectNotFound{},
|
|
expectedError: true,
|
|
countError: false,
|
|
},
|
|
{
|
|
name: "eacl not found error, no status",
|
|
ctx: ctx,
|
|
status: nil,
|
|
err: &apistatus.EACLNotFound{},
|
|
expectedError: true,
|
|
// we expect error be counted because status is nil
|
|
// currently we assume that DisableFrostFSErrorResolution be always false for pool
|
|
// and status be checked first in handleError
|
|
countError: true,
|
|
},
|
|
{
|
|
name: "no error, internal status",
|
|
ctx: ctx,
|
|
status: new(apistatus.ServerInternal),
|
|
err: nil,
|
|
expectedError: true,
|
|
countError: true,
|
|
},
|
|
{
|
|
name: "no error, wrong magic status",
|
|
ctx: ctx,
|
|
status: new(apistatus.WrongMagicNumber),
|
|
err: nil,
|
|
expectedError: true,
|
|
countError: true,
|
|
},
|
|
{
|
|
name: "no error, signature verification status",
|
|
ctx: ctx,
|
|
status: new(apistatus.SignatureVerification),
|
|
err: nil,
|
|
expectedError: true,
|
|
countError: true,
|
|
},
|
|
{
|
|
name: "no error, maintenance status",
|
|
ctx: ctx,
|
|
status: new(apistatus.NodeUnderMaintenance),
|
|
err: nil,
|
|
expectedError: true,
|
|
countError: true,
|
|
markedUnhealthy: true,
|
|
},
|
|
{
|
|
name: "maintenance error, no status",
|
|
ctx: ctx,
|
|
status: nil,
|
|
err: &apistatus.NodeUnderMaintenance{},
|
|
expectedError: true,
|
|
countError: true,
|
|
markedUnhealthy: true,
|
|
},
|
|
{
|
|
name: "no error, invalid argument status",
|
|
ctx: ctx,
|
|
status: new(apistatus.InvalidArgument),
|
|
err: nil,
|
|
expectedError: true,
|
|
countError: false,
|
|
},
|
|
{
|
|
name: "context canceled error, no status",
|
|
ctx: canceledCtx,
|
|
status: nil,
|
|
err: errors.New("error"),
|
|
expectedError: true,
|
|
countError: false,
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
monitor := newClientStatusMonitor(log, "", 10)
|
|
errCount := monitor.overallErrorRate()
|
|
err := monitor.handleError(tc.ctx, tc.status, tc.err)
|
|
if tc.expectedError {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
if tc.countError {
|
|
errCount++
|
|
}
|
|
require.Equal(t, errCount, monitor.overallErrorRate())
|
|
if tc.markedUnhealthy {
|
|
require.False(t, monitor.isHealthy())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSwitchAfterErrorThreshold(t *testing.T) {
|
|
nodes := []NodeParam{
|
|
{1, "peer0", 1},
|
|
{2, "peer1", 100},
|
|
}
|
|
|
|
errorThreshold := 5
|
|
|
|
var clientKeys []*ecdsa.PrivateKey
|
|
mockClientBuilder := func(addr string) client {
|
|
key := newPrivateKey(t)
|
|
clientKeys = append(clientKeys, key)
|
|
|
|
if addr == nodes[0].address {
|
|
mockCli := newMockClient(addr, *key)
|
|
mockCli.setThreshold(uint32(errorThreshold))
|
|
mockCli.statusOnGetObject(new(apistatus.ServerInternal))
|
|
return mockCli
|
|
}
|
|
|
|
return newMockClient(addr, *key)
|
|
}
|
|
|
|
opts := InitParameters{
|
|
key: newPrivateKey(t),
|
|
nodeParams: nodes,
|
|
clientRebalanceInterval: 30 * time.Second,
|
|
}
|
|
opts.setClientBuilder(mockClientBuilder)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
pool, err := NewPool(opts)
|
|
require.NoError(t, err)
|
|
err = pool.Dial(ctx)
|
|
require.NoError(t, err)
|
|
t.Cleanup(pool.Close)
|
|
|
|
for range errorThreshold {
|
|
conn, err := pool.connection()
|
|
require.NoError(t, err)
|
|
require.Equal(t, nodes[0].address, conn.address())
|
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
|
require.Error(t, err)
|
|
}
|
|
|
|
conn, err := pool.connection()
|
|
require.NoError(t, err)
|
|
require.Equal(t, nodes[1].address, conn.address())
|
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
|
require.NoError(t, err)
|
|
}
|