forked from TrueCloudLab/frostfs-sdk-go
[#28] Add recreating session token
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
cb0a844cae
commit
d95d722d61
2 changed files with 29 additions and 9 deletions
|
@ -169,22 +169,35 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu
|
|||
wg := sync.WaitGroup{}
|
||||
for i, cPack := range p.clientPacks {
|
||||
wg.Add(1)
|
||||
go func(i int, netmap client.Netmap) {
|
||||
go func(i int, client client.Client) {
|
||||
defer wg.Done()
|
||||
var (
|
||||
tkn *session.Token
|
||||
err error
|
||||
)
|
||||
ok := true
|
||||
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
||||
defer c()
|
||||
if _, err := netmap.EndpointInfo(tctx); err != nil {
|
||||
if _, err = client.EndpointInfo(tctx); err != nil {
|
||||
ok = false
|
||||
bufferWeights[i] = 0
|
||||
}
|
||||
if ok {
|
||||
bufferWeights[i] = options.weights[i]
|
||||
p.lock.RLock()
|
||||
if !p.clientPacks[i].healthy {
|
||||
if tkn, err = client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
|
||||
ok = false
|
||||
bufferWeights[i] = 0
|
||||
}
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
if p.clientPacks[i].healthy != ok {
|
||||
p.clientPacks[i].healthy = ok
|
||||
p.clientPacks[i].sessionToken = tkn
|
||||
healthyChanged = true
|
||||
}
|
||||
p.lock.Unlock()
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -42,24 +43,28 @@ func TestSamplerStability(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type netmapMock struct {
|
||||
type clientMock struct {
|
||||
client.Client
|
||||
name string
|
||||
err error
|
||||
}
|
||||
|
||||
func newNetmapMock(name string, needErr bool) netmapMock {
|
||||
func newNetmapMock(name string, needErr bool) clientMock {
|
||||
var err error
|
||||
if needErr {
|
||||
err = fmt.Errorf("not available")
|
||||
}
|
||||
return netmapMock{name: name, err: err}
|
||||
return clientMock{name: name, err: err}
|
||||
}
|
||||
|
||||
func (n netmapMock) EndpointInfo(_ context.Context, _ ...client.CallOption) (*client.EndpointInfo, error) {
|
||||
func (n clientMock) EndpointInfo(_ context.Context, _ ...client.CallOption) (*client.EndpointInfo, error) {
|
||||
return nil, n.err
|
||||
}
|
||||
|
||||
func (n clientMock) CreateSession(_ context.Context, _ uint64, _ ...client.CallOption) (*session.Token, error) {
|
||||
return session.NewToken(), n.err
|
||||
}
|
||||
|
||||
func TestHealthyReweight(t *testing.T) {
|
||||
var (
|
||||
weights = []float64{0.9, 0.1}
|
||||
|
@ -78,14 +83,14 @@ func TestHealthyReweight(t *testing.T) {
|
|||
// check getting first node connection before rebalance happened
|
||||
connection0, _, err := p.Connection()
|
||||
require.NoError(t, err)
|
||||
mock0 := connection0.(netmapMock)
|
||||
mock0 := connection0.(clientMock)
|
||||
require.Equal(t, names[0], mock0.name)
|
||||
|
||||
updateNodesHealth(context.TODO(), p, options, buffer)
|
||||
|
||||
connection1, _, err := p.Connection()
|
||||
require.NoError(t, err)
|
||||
mock1 := connection1.(netmapMock)
|
||||
mock1 := connection1.(clientMock)
|
||||
require.Equal(t, names[1], mock1.name)
|
||||
|
||||
// enabled first node again
|
||||
|
@ -98,8 +103,10 @@ func TestHealthyReweight(t *testing.T) {
|
|||
|
||||
connection0, _, err = p.Connection()
|
||||
require.NoError(t, err)
|
||||
mock0 = connection0.(netmapMock)
|
||||
mock0 = connection0.(clientMock)
|
||||
require.Equal(t, names[0], mock0.name)
|
||||
|
||||
require.NotNil(t, p.clientPacks[0].sessionToken)
|
||||
}
|
||||
|
||||
func TestHealthyNoReweight(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue