[#165] pool: distinguish init step and run

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-11 17:55:02 +03:00 committed by Alex Vanin
parent d03523a3bc
commit 7811d8eefc
3 changed files with 114 additions and 79 deletions

View file

@ -330,6 +330,9 @@ type Pool struct {
cache *sessionCache cache *sessionCache
stokenDuration uint64 stokenDuration uint64
stokenThreshold time.Duration stokenThreshold time.Duration
rebalanceParams rebalanceParameters
clientBuilder func(endpoint string) (Client, error)
logger *zap.Logger
} }
type innerPool struct { type innerPool struct {
@ -347,7 +350,7 @@ const (
) )
// NewPool create connection pool using parameters. // NewPool create connection pool using parameters.
func NewPool(ctx context.Context, options InitParameters) (*Pool, error) { func NewPool(options InitParameters) (*Pool, error) {
if options.Key == nil { if options.Key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'") return nil, fmt.Errorf("missed required parameter 'Key'")
} }
@ -364,65 +367,72 @@ func NewPool(ctx context.Context, options InitParameters) (*Pool, error) {
return nil, fmt.Errorf("couldn't create cache: %w", err) return nil, fmt.Errorf("couldn't create cache: %w", err)
} }
ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) pool := &Pool{
key: options.Key,
owner: owner.NewIDFromPublicKey(&options.Key.PublicKey),
cache: cache,
logger: options.Logger,
stokenDuration: options.SessionExpirationDuration,
stokenThreshold: options.SessionTokenThreshold,
rebalanceParams: rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.NodeRequestTimeout,
clientRebalanceInterval: options.ClientRebalanceInterval,
sessionExpirationDuration: options.SessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
}
inner := make([]*innerPool, len(nodesParams)) return pool, nil
}
// Dial establishes a connection to the servers from the NeoFS network.
// Returns an error describing failure reason. If failed, the Pool
// SHOULD NOT be used.
func (p *Pool) Dial(ctx context.Context) error {
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
var atLeastOneHealthy bool var atLeastOneHealthy bool
for i, params := range nodesParams { for i, params := range p.rebalanceParams.nodesParams {
clientPacks := make([]*clientPack, len(params.weights)) clientPacks := make([]*clientPack, len(params.weights))
for j, addr := range params.addresses { for j, addr := range params.addresses {
c, err := options.clientBuilder(addr) c, err := p.clientBuilder(addr)
if err != nil { if err != nil {
return nil, err return err
} }
var healthy bool var healthy bool
cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) cliRes, err := createSessionTokenForDuration(ctx, c, p.rebalanceParams.sessionExpirationDuration)
if err != nil && options.Logger != nil { if err != nil && p.logger != nil {
options.Logger.Warn("failed to create neofs session token for client", p.logger.Warn("failed to create neofs session token for client",
zap.String("Address", addr), zap.String("Address", addr),
zap.Error(err)) zap.Error(err))
} else if err == nil { } else if err == nil {
healthy, atLeastOneHealthy = true, true healthy, atLeastOneHealthy = true, true
st := sessionTokenForOwner(ownerID, cliRes) st := sessionTokenForOwner(p.owner, cliRes)
_ = cache.Put(formCacheKey(addr, options.Key), st) _ = p.cache.Put(formCacheKey(addr, p.key), st)
} }
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr} clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
} }
source := rand.NewSource(time.Now().UnixNano()) source := rand.NewSource(time.Now().UnixNano())
sampler := newSampler(params.weights, source) sampl := newSampler(params.weights, source)
inner[i] = &innerPool{ inner[i] = &innerPool{
sampler: sampler, sampler: sampl,
clientPacks: clientPacks, clientPacks: clientPacks,
} }
} }
if !atLeastOneHealthy { if !atLeastOneHealthy {
return nil, fmt.Errorf("at least one node must be healthy") return fmt.Errorf("at least one node must be healthy")
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
pool := &Pool{ p.cancel = cancel
innerPools: inner, p.closedCh = make(chan struct{})
key: options.Key, p.innerPools = inner
owner: ownerID,
cancel: cancel,
closedCh: make(chan struct{}),
cache: cache,
stokenDuration: options.SessionExpirationDuration,
stokenThreshold: options.SessionTokenThreshold,
}
rebalanceParams := rebalanceParameters{ go p.startRebalance(ctx)
nodesParams: nodesParams, return nil
nodeRequestTimeout: options.NodeRequestTimeout,
clientRebalanceInterval: options.ClientRebalanceInterval,
sessionExpirationDuration: options.SessionExpirationDuration,
}
go startRebalance(ctx, pool, rebalanceParams)
return pool, nil
} }
func fillDefaultInitParams(params *InitParameters) { func fillDefaultInitParams(params *InitParameters) {
@ -490,10 +500,10 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
return nodesParams, nil return nodesParams, nil
} }
func startRebalance(ctx context.Context, p *Pool, options rebalanceParameters) { func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(options.clientRebalanceInterval) ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]float64, len(options.nodesParams)) buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
for i, params := range options.nodesParams { for i, params := range p.rebalanceParams.nodesParams {
buffers[i] = make([]float64, len(params.weights)) buffers[i] = make([]float64, len(params.weights))
} }
@ -503,13 +513,13 @@ func startRebalance(ctx context.Context, p *Pool, options rebalanceParameters) {
close(p.closedCh) close(p.closedCh)
return return
case <-ticker.C: case <-ticker.C:
updateNodesHealth(ctx, p, options, buffers) p.updateNodesHealth(ctx, buffers)
ticker.Reset(options.clientRebalanceInterval) ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
} }
} }
} }
func updateNodesHealth(ctx context.Context, p *Pool, options rebalanceParameters, buffers [][]float64) { func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i, inner := range p.innerPools { for i, inner := range p.innerPools {
wg.Add(1) wg.Add(1)
@ -517,24 +527,25 @@ func updateNodesHealth(ctx context.Context, p *Pool, options rebalanceParameters
bufferWeights := buffers[i] bufferWeights := buffers[i]
go func(i int, innerPool *innerPool) { go func(i int, innerPool *innerPool) {
defer wg.Done() defer wg.Done()
updateInnerNodesHealth(ctx, p, i, options, bufferWeights) p.updateInnerNodesHealth(ctx, i, bufferWeights)
}(i, inner) }(i, inner)
} }
wg.Wait() wg.Wait()
} }
func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options rebalanceParameters, bufferWeights []float64) { func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
if i > len(pool.innerPools)-1 { if i > len(p.innerPools)-1 {
return return
} }
p := pool.innerPools[i] pool := p.innerPools[i]
options := p.rebalanceParams
healthyChanged := false healthyChanged := false
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
var prmEndpoint client.PrmEndpointInfo var prmEndpoint client.PrmEndpointInfo
for j, cPack := range p.clientPacks { for j, cPack := range pool.clientPacks {
wg.Add(1) wg.Add(1)
go func(j int, cli Client) { go func(j int, cli Client) {
defer wg.Done() defer wg.Done()
@ -546,9 +557,9 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba
ok = false ok = false
bufferWeights[j] = 0 bufferWeights[j] = 0
} }
p.lock.RLock() pool.lock.RLock()
cp := *p.clientPacks[j] cp := *pool.clientPacks[j]
p.lock.RUnlock() pool.lock.RUnlock()
if ok { if ok {
bufferWeights[j] = options.nodesParams[i].weights[j] bufferWeights[j] = options.nodesParams[i].weights[j]
@ -558,21 +569,21 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba
ok = false ok = false
bufferWeights[j] = 0 bufferWeights[j] = 0
} else { } else {
tkn := pool.newSessionToken(cliRes) tkn := p.newSessionToken(cliRes)
_ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn) _ = p.cache.Put(formCacheKey(cp.address, p.key), tkn)
} }
} }
} else { } else {
pool.cache.DeleteByPrefix(cp.address) p.cache.DeleteByPrefix(cp.address)
} }
p.lock.Lock() pool.lock.Lock()
if p.clientPacks[j].healthy != ok { if pool.clientPacks[j].healthy != ok {
p.clientPacks[j].healthy = ok pool.clientPacks[j].healthy = ok
healthyChanged = true healthyChanged = true
} }
p.lock.Unlock() pool.lock.Unlock()
}(j, cPack.client) }(j, cPack.client)
} }
wg.Wait() wg.Wait()
@ -580,9 +591,9 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba
if healthyChanged { if healthyChanged {
probabilities := adjustWeights(bufferWeights) probabilities := adjustWeights(bufferWeights)
source := rand.NewSource(time.Now().UnixNano()) source := rand.NewSource(time.Now().UnixNano())
p.lock.Lock() pool.lock.Lock()
p.sampler = newSampler(probabilities, source) pool.sampler = newSampler(probabilities, source)
p.lock.Unlock() pool.lock.Unlock()
} }
} }

View file

@ -34,7 +34,9 @@ func TestBuildPoolClientFailed(t *testing.T) {
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
_, err := NewPool(context.TODO(), opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.Error(t, err) require.Error(t, err)
} }
@ -58,7 +60,9 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
_, err := NewPool(context.TODO(), opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.Error(t, err) require.Error(t, err)
} }
@ -119,7 +123,9 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
}, },
} }
clientPool, err := NewPool(context.TODO(), opts) clientPool, err := NewPool(opts)
require.NoError(t, err)
err = clientPool.Dial(context.Background())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(clientPool.Close) t.Cleanup(clientPool.Close)
@ -135,7 +141,7 @@ func TestBuildPoolZeroNodes(t *testing.T) {
opts := InitParameters{ opts := InitParameters{
Key: newPrivateKey(t), Key: newPrivateKey(t),
} }
_, err := NewPool(context.TODO(), opts) _, err := NewPool(opts)
require.Error(t, err) require.Error(t, err)
} }
@ -163,7 +169,9 @@ func TestOneNode(t *testing.T) {
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
pool, err := NewPool(context.Background(), opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -202,7 +210,9 @@ func TestTwoNodes(t *testing.T) {
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
pool, err := NewPool(context.Background(), opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -259,7 +269,10 @@ func TestOneOfTwoFailed(t *testing.T) {
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
pool, err := NewPool(context.Background(), opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -296,8 +309,11 @@ func TestTwoFailed(t *testing.T) {
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
pool, err := NewPool(context.Background(), opts) pool, err := NewPool(opts)
require.NoError(t, err) require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
@ -342,7 +358,9 @@ func TestSessionCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, err := NewPool(ctx, opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -421,7 +439,9 @@ func TestPriority(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, err := NewPool(ctx, opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -475,7 +495,9 @@ func TestSessionCacheWithKey(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, err := NewPool(ctx, opts) pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(ctx)
require.NoError(t, err) require.NoError(t, err)
// cache must contain session token // cache must contain session token
@ -522,7 +544,9 @@ func TestSessionTokenOwner(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
p, err := NewPool(ctx, opts) p, err := NewPool(opts)
require.NoError(t, err)
err = p.Dial(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(p.Close) t.Cleanup(p.Close)

View file

@ -68,7 +68,6 @@ func TestHealthyReweight(t *testing.T) {
var ( var (
weights = []float64{0.9, 0.1} weights = []float64{0.9, 0.1}
names = []string{"node0", "node1"} names = []string{"node0", "node1"}
options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}
buffer = make([]float64, len(weights)) buffer = make([]float64, len(weights))
) )
@ -85,6 +84,7 @@ func TestHealthyReweight(t *testing.T) {
innerPools: []*innerPool{inner}, innerPools: []*innerPool{inner},
cache: cache, cache: cache,
key: newPrivateKey(t), key: newPrivateKey(t),
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
} }
// check getting first node connection before rebalance happened // check getting first node connection before rebalance happened
@ -93,7 +93,7 @@ func TestHealthyReweight(t *testing.T) {
mock0 := connection0.(*clientMock) mock0 := connection0.(*clientMock)
require.Equal(t, names[0], mock0.name) require.Equal(t, names[0], mock0.name)
updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) p.updateInnerNodesHealth(context.TODO(), 0, buffer)
connection1, _, err := p.Connection() connection1, _, err := p.Connection()
require.NoError(t, err) require.NoError(t, err)
@ -105,7 +105,7 @@ func TestHealthyReweight(t *testing.T) {
inner.clientPacks[0].client = newNetmapMock(names[0], false) inner.clientPacks[0].client = newNetmapMock(names[0], false)
inner.lock.Unlock() inner.lock.Unlock()
updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) p.updateInnerNodesHealth(context.TODO(), 0, buffer)
inner.sampler = newSampler(weights, rand.NewSource(0)) inner.sampler = newSampler(weights, rand.NewSource(0))
connection0, _, err = p.Connection() connection0, _, err = p.Connection()
@ -118,7 +118,6 @@ func TestHealthyNoReweight(t *testing.T) {
var ( var (
weights = []float64{0.9, 0.1} weights = []float64{0.9, 0.1}
names = []string{"node0", "node1"} names = []string{"node0", "node1"}
options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}
buffer = make([]float64, len(weights)) buffer = make([]float64, len(weights))
) )
@ -131,9 +130,10 @@ func TestHealthyNoReweight(t *testing.T) {
} }
p := &Pool{ p := &Pool{
innerPools: []*innerPool{inner}, innerPools: []*innerPool{inner},
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
} }
updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) p.updateInnerNodesHealth(context.TODO(), 0, buffer)
inner.lock.RLock() inner.lock.RLock()
defer inner.lock.RUnlock() defer inner.lock.RUnlock()