From 880f3a61e553ed7fe8f76b35a1c3b7b77dfb4599 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 7 Sep 2021 17:34:20 +0300 Subject: [PATCH] [#32] Require at least one healthy node Signed-off-by: Denis Kirillov --- pool/pool.go | 19 ++++++++++--- pool/pool_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 1c89ca6..ba71fa1 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/session" + "go.uber.org/zap" ) // Client is a wrapper for client.Client to generate mock. @@ -26,6 +27,7 @@ type Client interface { // BuilderOptions contains options used to build connection pool. type BuilderOptions struct { Key *ecdsa.PrivateKey + Logger *zap.Logger NodeConnectionTimeout time.Duration NodeRequestTimeout time.Duration ClientRebalanceInterval time.Duration @@ -105,6 +107,7 @@ type pool struct { func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { clientPacks := make([]*clientPack, len(options.weights)) + var atLeastOneHealthy bool for i, address := range options.addresses { c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), client.WithURIAddress(address, nil), @@ -112,12 +115,22 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { if err != nil { return nil, err } + var healthy bool st, err := c.CreateSession(ctx, options.SessionExpirationEpoch) - if err != nil { - return nil, fmt.Errorf("failed to create neofs session token for client %v: %w", address, err) + if err != nil && options.Logger != nil { + options.Logger.Warn("failed to create neofs session token for client", + zap.String("address", address), + zap.Error(err)) + } else if err == nil { + healthy, atLeastOneHealthy = true, true } - clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true} + clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: healthy} } + + if !atLeastOneHealthy { + return nil, fmt.Errorf("at least one node must be healthy") + } + source := rand.NewSource(time.Now().UnixNano()) sampler := NewSampler(options.weights, source) wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey) diff --git a/pool/pool_test.go b/pool/pool_test.go index 5d690d9..a6e9358 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/session" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestBuildPoolClientFailed(t *testing.T) { @@ -48,8 +49,8 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { clientBuilder := func(opts ...client.Option) (client.Client, error) { mockClient := NewMockClient(ctrl) - mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")) - mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfo{}, nil) + mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes() + mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfo{}, nil).AnyTimes() return mockClient, nil } @@ -63,7 +64,69 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { _, err = pb.Build(context.TODO(), opts) require.Error(t, err) - require.Contains(t, err.Error(), "client []: error session") +} + +func TestBuildPoolOneNodeFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctrl2 := gomock.NewController(t) + defer ctrl2.Finish() + + key, err := keys.NewPrivateKey() + require.NoError(t, err) + + ni := &netmap.NodeInfo{} + ni.SetAddresses("addr1", "addr2") + + var expectedToken *session.Token + clientCount := -1 + clientBuilder := func(opts ...client.Option) (client.Client, error) { + clientCount++ + mockClient := NewMockClient(ctrl) + mockInvokes := 0 + mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { + mockInvokes++ + if mockInvokes == 1 { + expectedToken = newToken(t) + return nil, fmt.Errorf("error session") + } + return expectedToken, nil + }).AnyTimes() + + mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + mockClient2 := NewMockClient(ctrl2) + mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + if clientCount == 0 { + return mockClient, nil + } + return mockClient2, nil + } + + pb := new(Builder) + pb.AddNode("peer0", 9) + pb.AddNode("peer1", 1) + + log, err := zap.NewProduction() + require.NoError(t, err) + opts := &BuilderOptions{ + Key: &key.PrivateKey, + clientBuilder: clientBuilder, + ClientRebalanceInterval: 1000 * time.Millisecond, + Logger: log, + } + + clientPool, err := pb.Build(context.TODO(), opts) + require.NoError(t, err) + + condition := func() bool { + _, st, err := clientPool.Connection() + return err == nil && st == expectedToken + } + require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond) + require.Eventually(t, condition, 3*time.Second, 300*time.Millisecond) } func TestBuildPoolZeroNodes(t *testing.T) {