frostfs-sdk-go/pool/pool_test.go
Leonard Lyubich 213d20e3fb [#92] client: Accept structured parameters in non-object operations
Define `XPrm` type for each `X` client operation which structures
parameters. Export setters of each parameterized value. Emphasize that
some parameters are required. Make the client panic when the parameters
are incorrectly set. Get rid of vadiadic call options and `CallOption`
type. Improve documentation of client behavior.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2022-02-01 17:02:43 +03:00

579 lines
16 KiB
Go

package pool
//go:generate mockgen -destination mock_test.go -package pool . Client
import (
"context"
"crypto/ecdsa"
"fmt"
"math/rand"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestBuildPoolClientFailed(t *testing.T) {
clientBuilder := func(opts ...client.Option) (Client, error) {
return nil, fmt.Errorf("error")
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
}
_, err := pb.Build(context.TODO(), opts)
require.Error(t, err)
}
func TestBuildPoolCreateSessionFailed(t *testing.T) {
ctrl := gomock.NewController(t)
ni := &netmap.NodeInfo{}
ni.SetAddresses("addr1", "addr2")
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes()
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
}
_, err := pb.Build(context.TODO(), opts)
require.Error(t, err)
}
func newPrivateKey(t *testing.T) *ecdsa.PrivateKey {
p, err := keys.NewPrivateKey()
require.NoError(t, err)
return &p.PrivateKey
}
func TestBuildPoolOneNodeFailed(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
ctrl2 := gomock.NewController(t)
ni := &netmap.NodeInfo{}
ni.SetAddresses("addr1", "addr2")
var expectedToken *session.Token
clientCount := -1
clientBuilder := func(opts ...client.Option) (Client, error) {
clientCount++
mockClient := NewMockClient(ctrl)
mockInvokes := 0
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
mockInvokes++
if mockInvokes == 1 {
expectedToken = newToken(t)
return nil, fmt.Errorf("error session")
}
return expectedToken, nil
}).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockClient2 := NewMockClient(ctrl2)
mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
if clientCount == 0 {
return mockClient, nil
}
return mockClient2, nil
}
pb := new(Builder)
pb.AddNode("peer0", 9, 1)
pb.AddNode("peer1", 1, 1)
log, err := zap.NewProduction()
require.NoError(t, err)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 1000 * time.Millisecond,
Logger: log,
}
clientPool, err := pb.Build(context.TODO(), opts)
require.NoError(t, err)
t.Cleanup(clientPool.Close)
condition := func() bool {
_, st, err := clientPool.Connection()
return err == nil && st == expectedToken
}
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
require.Eventually(t, condition, 3*time.Second, 300*time.Millisecond)
}
func TestBuildPoolZeroNodes(t *testing.T) {
pb := new(Builder)
opts := &BuilderOptions{
Key: newPrivateKey(t),
}
_, err := pb.Build(context.TODO(), opts)
require.Error(t, err)
}
func TestOneNode(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
tok := session.NewToken()
uid, err := uuid.New().MarshalBinary()
require.NoError(t, err)
tok.SetID(uid)
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(tok, nil)
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes()
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
}
pool, err := pb.Build(context.Background(), opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
_, st, err := pool.Connection()
require.NoError(t, err)
require.Equal(t, tok, st)
}
func TestTwoNodes(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
var tokens []*session.Token
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := session.NewToken()
uid, err := uuid.New().MarshalBinary()
require.NoError(t, err)
tok.SetID(uid)
tokens = append(tokens, tok)
return tok, err
})
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes()
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
}
pool, err := pb.Build(context.Background(), opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
_, st, err := pool.Connection()
require.NoError(t, err)
require.Contains(t, tokens, st)
}
func TestOneOfTwoFailed(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
ctrl2 := gomock.NewController(t)
var tokens []*session.Token
clientCount := -1
clientBuilder := func(opts ...client.Option) (Client, error) {
clientCount++
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := newToken(t)
tokens = append(tokens, tok)
return tok, nil
}).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockClient2 := NewMockClient(ctrl2)
mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := newToken(t)
tokens = append(tokens, tok)
return tok, nil
}).AnyTimes()
mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).DoAndReturn(func(_ interface{}, _ ...interface{}) (*client.EndpointInfoRes, error) {
return nil, fmt.Errorf("error")
}).AnyTimes()
if clientCount == 0 {
return mockClient, nil
}
return mockClient2, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 9, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 200 * time.Millisecond,
}
pool, err := pb.Build(context.Background(), opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
time.Sleep(2 * time.Second)
for i := 0; i < 5; i++ {
_, st, err := pool.Connection()
require.NoError(t, err)
require.Equal(t, tokens[0], st)
}
}
func TestTwoFailed(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes()
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 200 * time.Millisecond,
}
pool, err := pb.Build(context.Background(), opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
time.Sleep(2 * time.Second)
_, _, err = pool.Connection()
require.Error(t, err)
require.Contains(t, err.Error(), "no healthy")
}
func TestSessionCache(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
var tokens []*session.Token
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := session.NewToken()
uid, err := uuid.New().MarshalBinary()
require.NoError(t, err)
tok.SetID(uid)
tokens = append(tokens, tok)
return tok, err
}).MaxTimes(3)
mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist"))
mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 30 * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pb.Build(ctx, opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
// cache must contain session token
_, st, err := pool.Connection()
require.NoError(t, err)
require.Contains(t, tokens, st)
_, err = pool.GetObject(ctx, nil, retry())
require.Error(t, err)
// cache must not contain session token
_, st, err = pool.Connection()
require.NoError(t, err)
require.Nil(t, st)
_, err = pool.PutObject(ctx, nil)
require.NoError(t, err)
// cache must contain session token
_, st, err = pool.Connection()
require.NoError(t, err)
require.Contains(t, tokens, st)
}
func TestPriority(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
ctrl2 := gomock.NewController(t)
var tokens []*session.Token
clientCount := -1
clientBuilder := func(opts ...client.Option) (Client, error) {
clientCount++
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := newToken(t)
tokens = append(tokens, tok)
return tok, nil
}).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes()
mockClient2 := NewMockClient(ctrl2)
mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := newToken(t)
tokens = append(tokens, tok)
return tok, nil
}).AnyTimes()
mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
if clientCount == 0 {
return mockClient, nil
}
return mockClient2, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 2, 100)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 1500 * time.Millisecond,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pb.Build(ctx, opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
firstNode := func() bool {
_, st, err := pool.Connection()
require.NoError(t, err)
return st == tokens[0]
}
secondNode := func() bool {
_, st, err := pool.Connection()
require.NoError(t, err)
return st == tokens[1]
}
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
require.Eventually(t, secondNode, time.Second, 200*time.Millisecond)
require.Never(t, firstNode, time.Second, 200*time.Millisecond)
}
func TestSessionCacheWithKey(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
var tokens []*session.Token
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := session.NewToken()
uid, err := uuid.New().MarshalBinary()
require.NoError(t, err)
tok.SetID(uid)
tokens = append(tokens, tok)
return tok, err
}).MaxTimes(2)
mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 30 * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pb.Build(ctx, opts)
require.NoError(t, err)
// cache must contain session token
_, st, err := pool.Connection()
require.NoError(t, err)
require.Contains(t, tokens, st)
_, err = pool.GetObject(ctx, nil, WithKey(newPrivateKey(t)))
require.NoError(t, err)
require.Len(t, tokens, 2)
}
func newToken(t *testing.T) *session.Token {
tok := session.NewToken()
uid, err := uuid.New().MarshalBinary()
require.NoError(t, err)
tok.SetID(uid)
return tok
}
func TestSessionTokenOwner(t *testing.T) {
ctrl := gomock.NewController(t)
clientBuilder := func(opts ...client.Option) (Client, error) {
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&client.CreateSessionRes{}, nil).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes()
return mockClient, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pp, err := pb.Build(ctx, opts)
require.NoError(t, err)
t.Cleanup(pp.Close)
p, ok := pp.(*pool)
require.True(t, ok)
anonKey := newPrivateKey(t)
anonOwner := owner.NewIDFromPublicKey(&anonKey.PublicKey)
cfg := cfgFromOpts(WithKey(anonKey), useDefaultSession())
cp, _, err := p.conn(ctx, cfg)
require.NoError(t, err)
tkn := p.cache.Get(formCacheKey(cp.address, anonKey))
require.True(t, anonOwner.Equal(tkn.OwnerID()))
}
func TestWaitPresence(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockClient.EXPECT().GetContainer(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
cache, err := NewCache()
require.NoError(t, err)
inner := &innerPool{
sampler: NewSampler([]float64{1}, rand.NewSource(0)),
clientPacks: []*clientPack{{
client: mockClient,
healthy: true,
}},
}
p := &pool{
innerPools: []*innerPool{inner},
key: newPrivateKey(t),
cache: cache,
}
t.Run("context canceled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(500 * time.Millisecond)
cancel()
}()
err := p.WaitForContainerPresence(ctx, nil, DefaultPollingParams())
require.Error(t, err)
require.Contains(t, err.Error(), "context canceled")
})
t.Run("context deadline exceeded", func(t *testing.T) {
ctx := context.Background()
err := p.WaitForContainerPresence(ctx, nil, &ContainerPollingParams{
CreationTimeout: 500 * time.Millisecond,
PollInterval: 5 * time.Second,
})
require.Error(t, err)
require.Contains(t, err.Error(), "context deadline exceeded")
})
t.Run("ok", func(t *testing.T) {
ctx := context.Background()
err := p.WaitForContainerPresence(ctx, nil, &ContainerPollingParams{
CreationTimeout: 10 * time.Second,
PollInterval: 500 * time.Millisecond,
})
require.NoError(t, err)
})
}