diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index 49dd124..5518c6f 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -169,22 +169,35 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu wg := sync.WaitGroup{} for i, cPack := range p.clientPacks { wg.Add(1) - go func(i int, netmap client.Netmap) { + go func(i int, client client.Client) { defer wg.Done() + var ( + tkn *session.Token + err error + ) ok := true tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) defer c() - if _, err := netmap.EndpointInfo(tctx); err != nil { + if _, err = client.EndpointInfo(tctx); err != nil { ok = false bufferWeights[i] = 0 } if ok { bufferWeights[i] = options.weights[i] + p.lock.RLock() + if !p.clientPacks[i].healthy { + if tkn, err = client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil { + ok = false + bufferWeights[i] = 0 + } + } + p.lock.RUnlock() } p.lock.Lock() if p.clientPacks[i].healthy != ok { p.clientPacks[i].healthy = ok + p.clientPacks[i].sessionToken = tkn healthyChanged = true } p.lock.Unlock() diff --git a/pkg/pool/sampler_test.go b/pkg/pool/sampler_test.go index 2738e46..6fefb36 100644 --- a/pkg/pool/sampler_test.go +++ b/pkg/pool/sampler_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/session" "github.com/stretchr/testify/require" ) @@ -42,24 +43,28 @@ func TestSamplerStability(t *testing.T) { } } -type netmapMock struct { +type clientMock struct { client.Client name string err error } -func newNetmapMock(name string, needErr bool) netmapMock { +func newNetmapMock(name string, needErr bool) clientMock { var err error if needErr { err = fmt.Errorf("not available") } - return netmapMock{name: name, err: err} + return clientMock{name: name, err: err} } -func (n netmapMock) EndpointInfo(_ context.Context, _ ...client.CallOption) (*client.EndpointInfo, error) { +func (n clientMock) EndpointInfo(_ context.Context, _ ...client.CallOption) (*client.EndpointInfo, error) { return nil, n.err } +func (n clientMock) CreateSession(_ context.Context, _ uint64, _ ...client.CallOption) (*session.Token, error) { + return session.NewToken(), n.err +} + func TestHealthyReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} @@ -78,14 +83,14 @@ func TestHealthyReweight(t *testing.T) { // check getting first node connection before rebalance happened connection0, _, err := p.Connection() require.NoError(t, err) - mock0 := connection0.(netmapMock) + mock0 := connection0.(clientMock) require.Equal(t, names[0], mock0.name) updateNodesHealth(context.TODO(), p, options, buffer) connection1, _, err := p.Connection() require.NoError(t, err) - mock1 := connection1.(netmapMock) + mock1 := connection1.(clientMock) require.Equal(t, names[1], mock1.name) // enabled first node again @@ -98,8 +103,10 @@ func TestHealthyReweight(t *testing.T) { connection0, _, err = p.Connection() require.NoError(t, err) - mock0 = connection0.(netmapMock) + mock0 = connection0.(clientMock) require.Equal(t, names[0], mock0.name) + + require.NotNil(t, p.clientPacks[0].sessionToken) } func TestHealthyNoReweight(t *testing.T) {