[#32] Require at least one healthy node
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
234373f249
commit
880f3a61e5
2 changed files with 82 additions and 6 deletions
19
pool/pool.go
19
pool/pool.go
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Client is a wrapper for client.Client to generate mock.
|
// Client is a wrapper for client.Client to generate mock.
|
||||||
|
@ -26,6 +27,7 @@ type Client interface {
|
||||||
// BuilderOptions contains options used to build connection pool.
|
// BuilderOptions contains options used to build connection pool.
|
||||||
type BuilderOptions struct {
|
type BuilderOptions struct {
|
||||||
Key *ecdsa.PrivateKey
|
Key *ecdsa.PrivateKey
|
||||||
|
Logger *zap.Logger
|
||||||
NodeConnectionTimeout time.Duration
|
NodeConnectionTimeout time.Duration
|
||||||
NodeRequestTimeout time.Duration
|
NodeRequestTimeout time.Duration
|
||||||
ClientRebalanceInterval time.Duration
|
ClientRebalanceInterval time.Duration
|
||||||
|
@ -105,6 +107,7 @@ type pool struct {
|
||||||
|
|
||||||
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
clientPacks := make([]*clientPack, len(options.weights))
|
clientPacks := make([]*clientPack, len(options.weights))
|
||||||
|
var atLeastOneHealthy bool
|
||||||
for i, address := range options.addresses {
|
for i, address := range options.addresses {
|
||||||
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
|
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
|
||||||
client.WithURIAddress(address, nil),
|
client.WithURIAddress(address, nil),
|
||||||
|
@ -112,12 +115,22 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
var healthy bool
|
||||||
st, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
|
st, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
|
||||||
if err != nil {
|
if err != nil && options.Logger != nil {
|
||||||
return nil, fmt.Errorf("failed to create neofs session token for client %v: %w", address, err)
|
options.Logger.Warn("failed to create neofs session token for client",
|
||||||
|
zap.String("address", address),
|
||||||
|
zap.Error(err))
|
||||||
|
} else if err == nil {
|
||||||
|
healthy, atLeastOneHealthy = true, true
|
||||||
}
|
}
|
||||||
clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true}
|
clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: healthy}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !atLeastOneHealthy {
|
||||||
|
return nil, fmt.Errorf("at least one node must be healthy")
|
||||||
|
}
|
||||||
|
|
||||||
source := rand.NewSource(time.Now().UnixNano())
|
source := rand.NewSource(time.Now().UnixNano())
|
||||||
sampler := NewSampler(options.weights, source)
|
sampler := NewSampler(options.weights, source)
|
||||||
wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey)
|
wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey)
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBuildPoolClientFailed(t *testing.T) {
|
func TestBuildPoolClientFailed(t *testing.T) {
|
||||||
|
@ -48,8 +49,8 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
|
||||||
|
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session"))
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes()
|
||||||
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfo{}, nil)
|
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfo{}, nil).AnyTimes()
|
||||||
return mockClient, nil
|
return mockClient, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +64,69 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
|
||||||
|
|
||||||
_, err = pb.Build(context.TODO(), opts)
|
_, err = pb.Build(context.TODO(), opts)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "client []: error session")
|
}
|
||||||
|
|
||||||
|
func TestBuildPoolOneNodeFailed(t *testing.T) {
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
defer ctrl.Finish()
|
||||||
|
ctrl2 := gomock.NewController(t)
|
||||||
|
defer ctrl2.Finish()
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ni := &netmap.NodeInfo{}
|
||||||
|
ni.SetAddresses("addr1", "addr2")
|
||||||
|
|
||||||
|
var expectedToken *session.Token
|
||||||
|
clientCount := -1
|
||||||
|
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
||||||
|
clientCount++
|
||||||
|
mockClient := NewMockClient(ctrl)
|
||||||
|
mockInvokes := 0
|
||||||
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
||||||
|
mockInvokes++
|
||||||
|
if mockInvokes == 1 {
|
||||||
|
expectedToken = newToken(t)
|
||||||
|
return nil, fmt.Errorf("error session")
|
||||||
|
}
|
||||||
|
return expectedToken, nil
|
||||||
|
}).AnyTimes()
|
||||||
|
|
||||||
|
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
|
|
||||||
|
mockClient2 := NewMockClient(ctrl2)
|
||||||
|
mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
|
mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
|
|
||||||
|
if clientCount == 0 {
|
||||||
|
return mockClient, nil
|
||||||
|
}
|
||||||
|
return mockClient2, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pb := new(Builder)
|
||||||
|
pb.AddNode("peer0", 9)
|
||||||
|
pb.AddNode("peer1", 1)
|
||||||
|
|
||||||
|
log, err := zap.NewProduction()
|
||||||
|
require.NoError(t, err)
|
||||||
|
opts := &BuilderOptions{
|
||||||
|
Key: &key.PrivateKey,
|
||||||
|
clientBuilder: clientBuilder,
|
||||||
|
ClientRebalanceInterval: 1000 * time.Millisecond,
|
||||||
|
Logger: log,
|
||||||
|
}
|
||||||
|
|
||||||
|
clientPool, err := pb.Build(context.TODO(), opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
condition := func() bool {
|
||||||
|
_, st, err := clientPool.Connection()
|
||||||
|
return err == nil && st == expectedToken
|
||||||
|
}
|
||||||
|
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
||||||
|
require.Eventually(t, condition, 3*time.Second, 300*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildPoolZeroNodes(t *testing.T) {
|
func TestBuildPoolZeroNodes(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue