From 7811d8eefc2eb1c6455dde7581a0a65d7d376b02 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 11 Mar 2022 17:55:02 +0300 Subject: [PATCH] [#165] pool: distinguish init step and run Signed-off-by: Denis Kirillov --- pool/pool.go | 127 +++++++++++++++++++++++-------------------- pool/pool_test.go | 48 ++++++++++++---- pool/sampler_test.go | 18 +++--- 3 files changed, 114 insertions(+), 79 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 51fffde..e94e1a7 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -330,6 +330,9 @@ type Pool struct { cache *sessionCache stokenDuration uint64 stokenThreshold time.Duration + rebalanceParams rebalanceParameters + clientBuilder func(endpoint string) (Client, error) + logger *zap.Logger } type innerPool struct { @@ -347,7 +350,7 @@ const ( ) // NewPool create connection pool using parameters. -func NewPool(ctx context.Context, options InitParameters) (*Pool, error) { +func NewPool(options InitParameters) (*Pool, error) { if options.Key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } @@ -364,65 +367,72 @@ func NewPool(ctx context.Context, options InitParameters) (*Pool, error) { return nil, fmt.Errorf("couldn't create cache: %w", err) } - ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) + pool := &Pool{ + key: options.Key, + owner: owner.NewIDFromPublicKey(&options.Key.PublicKey), + cache: cache, + logger: options.Logger, + stokenDuration: options.SessionExpirationDuration, + stokenThreshold: options.SessionTokenThreshold, + rebalanceParams: rebalanceParameters{ + nodesParams: nodesParams, + nodeRequestTimeout: options.NodeRequestTimeout, + clientRebalanceInterval: options.ClientRebalanceInterval, + sessionExpirationDuration: options.SessionExpirationDuration, + }, + clientBuilder: options.clientBuilder, + } - inner := make([]*innerPool, len(nodesParams)) + return pool, nil +} + +// Dial establishes a connection to the servers from the NeoFS network. +// Returns an error describing failure reason. If failed, the Pool +// SHOULD NOT be used. +func (p *Pool) Dial(ctx context.Context) error { + inner := make([]*innerPool, len(p.rebalanceParams.nodesParams)) var atLeastOneHealthy bool - for i, params := range nodesParams { + for i, params := range p.rebalanceParams.nodesParams { clientPacks := make([]*clientPack, len(params.weights)) for j, addr := range params.addresses { - c, err := options.clientBuilder(addr) + c, err := p.clientBuilder(addr) if err != nil { - return nil, err + return err } var healthy bool - cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) - if err != nil && options.Logger != nil { - options.Logger.Warn("failed to create neofs session token for client", + cliRes, err := createSessionTokenForDuration(ctx, c, p.rebalanceParams.sessionExpirationDuration) + if err != nil && p.logger != nil { + p.logger.Warn("failed to create neofs session token for client", zap.String("Address", addr), zap.Error(err)) } else if err == nil { healthy, atLeastOneHealthy = true, true - st := sessionTokenForOwner(ownerID, cliRes) - _ = cache.Put(formCacheKey(addr, options.Key), st) + st := sessionTokenForOwner(p.owner, cliRes) + _ = p.cache.Put(formCacheKey(addr, p.key), st) } clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr} } source := rand.NewSource(time.Now().UnixNano()) - sampler := newSampler(params.weights, source) + sampl := newSampler(params.weights, source) inner[i] = &innerPool{ - sampler: sampler, + sampler: sampl, clientPacks: clientPacks, } } if !atLeastOneHealthy { - return nil, fmt.Errorf("at least one node must be healthy") + return fmt.Errorf("at least one node must be healthy") } ctx, cancel := context.WithCancel(ctx) - pool := &Pool{ - innerPools: inner, - key: options.Key, - owner: ownerID, - cancel: cancel, - closedCh: make(chan struct{}), - cache: cache, - stokenDuration: options.SessionExpirationDuration, - stokenThreshold: options.SessionTokenThreshold, - } + p.cancel = cancel + p.closedCh = make(chan struct{}) + p.innerPools = inner - rebalanceParams := rebalanceParameters{ - nodesParams: nodesParams, - nodeRequestTimeout: options.NodeRequestTimeout, - clientRebalanceInterval: options.ClientRebalanceInterval, - sessionExpirationDuration: options.SessionExpirationDuration, - } - - go startRebalance(ctx, pool, rebalanceParams) - return pool, nil + go p.startRebalance(ctx) + return nil } func fillDefaultInitParams(params *InitParameters) { @@ -490,10 +500,10 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { return nodesParams, nil } -func startRebalance(ctx context.Context, p *Pool, options rebalanceParameters) { - ticker := time.NewTimer(options.clientRebalanceInterval) - buffers := make([][]float64, len(options.nodesParams)) - for i, params := range options.nodesParams { +func (p *Pool) startRebalance(ctx context.Context) { + ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) + buffers := make([][]float64, len(p.rebalanceParams.nodesParams)) + for i, params := range p.rebalanceParams.nodesParams { buffers[i] = make([]float64, len(params.weights)) } @@ -503,13 +513,13 @@ func startRebalance(ctx context.Context, p *Pool, options rebalanceParameters) { close(p.closedCh) return case <-ticker.C: - updateNodesHealth(ctx, p, options, buffers) - ticker.Reset(options.clientRebalanceInterval) + p.updateNodesHealth(ctx, buffers) + ticker.Reset(p.rebalanceParams.clientRebalanceInterval) } } } -func updateNodesHealth(ctx context.Context, p *Pool, options rebalanceParameters, buffers [][]float64) { +func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) @@ -517,24 +527,25 @@ func updateNodesHealth(ctx context.Context, p *Pool, options rebalanceParameters bufferWeights := buffers[i] go func(i int, innerPool *innerPool) { defer wg.Done() - updateInnerNodesHealth(ctx, p, i, options, bufferWeights) + p.updateInnerNodesHealth(ctx, i, bufferWeights) }(i, inner) } wg.Wait() } -func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options rebalanceParameters, bufferWeights []float64) { - if i > len(pool.innerPools)-1 { +func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { + if i > len(p.innerPools)-1 { return } - p := pool.innerPools[i] + pool := p.innerPools[i] + options := p.rebalanceParams healthyChanged := false wg := sync.WaitGroup{} var prmEndpoint client.PrmEndpointInfo - for j, cPack := range p.clientPacks { + for j, cPack := range pool.clientPacks { wg.Add(1) go func(j int, cli Client) { defer wg.Done() @@ -546,9 +557,9 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba ok = false bufferWeights[j] = 0 } - p.lock.RLock() - cp := *p.clientPacks[j] - p.lock.RUnlock() + pool.lock.RLock() + cp := *pool.clientPacks[j] + pool.lock.RUnlock() if ok { bufferWeights[j] = options.nodesParams[i].weights[j] @@ -558,21 +569,21 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba ok = false bufferWeights[j] = 0 } else { - tkn := pool.newSessionToken(cliRes) + tkn := p.newSessionToken(cliRes) - _ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn) + _ = p.cache.Put(formCacheKey(cp.address, p.key), tkn) } } } else { - pool.cache.DeleteByPrefix(cp.address) + p.cache.DeleteByPrefix(cp.address) } - p.lock.Lock() - if p.clientPacks[j].healthy != ok { - p.clientPacks[j].healthy = ok + pool.lock.Lock() + if pool.clientPacks[j].healthy != ok { + pool.clientPacks[j].healthy = ok healthyChanged = true } - p.lock.Unlock() + pool.lock.Unlock() }(j, cPack.client) } wg.Wait() @@ -580,9 +591,9 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba if healthyChanged { probabilities := adjustWeights(bufferWeights) source := rand.NewSource(time.Now().UnixNano()) - p.lock.Lock() - p.sampler = newSampler(probabilities, source) - p.lock.Unlock() + pool.lock.Lock() + pool.sampler = newSampler(probabilities, source) + pool.lock.Unlock() } } diff --git a/pool/pool_test.go b/pool/pool_test.go index c70b7d0..9f468d7 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -34,7 +34,9 @@ func TestBuildPoolClientFailed(t *testing.T) { clientBuilder: clientBuilder, } - _, err := NewPool(context.TODO(), opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(context.Background()) require.Error(t, err) } @@ -58,7 +60,9 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { clientBuilder: clientBuilder, } - _, err := NewPool(context.TODO(), opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(context.Background()) require.Error(t, err) } @@ -119,7 +123,9 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { }, } - clientPool, err := NewPool(context.TODO(), opts) + clientPool, err := NewPool(opts) + require.NoError(t, err) + err = clientPool.Dial(context.Background()) require.NoError(t, err) t.Cleanup(clientPool.Close) @@ -135,7 +141,7 @@ func TestBuildPoolZeroNodes(t *testing.T) { opts := InitParameters{ Key: newPrivateKey(t), } - _, err := NewPool(context.TODO(), opts) + _, err := NewPool(opts) require.Error(t, err) } @@ -163,7 +169,9 @@ func TestOneNode(t *testing.T) { clientBuilder: clientBuilder, } - pool, err := NewPool(context.Background(), opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(context.Background()) require.NoError(t, err) t.Cleanup(pool.Close) @@ -202,7 +210,9 @@ func TestTwoNodes(t *testing.T) { clientBuilder: clientBuilder, } - pool, err := NewPool(context.Background(), opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(context.Background()) require.NoError(t, err) t.Cleanup(pool.Close) @@ -259,7 +269,10 @@ func TestOneOfTwoFailed(t *testing.T) { clientBuilder: clientBuilder, } - pool, err := NewPool(context.Background(), opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(context.Background()) + require.NoError(t, err) require.NoError(t, err) t.Cleanup(pool.Close) @@ -296,8 +309,11 @@ func TestTwoFailed(t *testing.T) { clientBuilder: clientBuilder, } - pool, err := NewPool(context.Background(), opts) + pool, err := NewPool(opts) require.NoError(t, err) + err = pool.Dial(context.Background()) + require.NoError(t, err) + t.Cleanup(pool.Close) time.Sleep(2 * time.Second) @@ -342,7 +358,9 @@ func TestSessionCache(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewPool(ctx, opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(ctx) require.NoError(t, err) t.Cleanup(pool.Close) @@ -421,7 +439,9 @@ func TestPriority(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewPool(ctx, opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(ctx) require.NoError(t, err) t.Cleanup(pool.Close) @@ -475,7 +495,9 @@ func TestSessionCacheWithKey(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewPool(ctx, opts) + pool, err := NewPool(opts) + require.NoError(t, err) + err = pool.Dial(ctx) require.NoError(t, err) // cache must contain session token @@ -522,7 +544,9 @@ func TestSessionTokenOwner(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - p, err := NewPool(ctx, opts) + p, err := NewPool(opts) + require.NoError(t, err) + err = p.Dial(ctx) require.NoError(t, err) t.Cleanup(p.Close) diff --git a/pool/sampler_test.go b/pool/sampler_test.go index 00d7de4..77a26d2 100644 --- a/pool/sampler_test.go +++ b/pool/sampler_test.go @@ -68,7 +68,6 @@ func TestHealthyReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} names = []string{"node0", "node1"} - options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}} buffer = make([]float64, len(weights)) ) @@ -82,9 +81,10 @@ func TestHealthyReweight(t *testing.T) { {client: newNetmapMock(names[1], false), healthy: true, address: "address1"}}, } p := &Pool{ - innerPools: []*innerPool{inner}, - cache: cache, - key: newPrivateKey(t), + innerPools: []*innerPool{inner}, + cache: cache, + key: newPrivateKey(t), + rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}, } // check getting first node connection before rebalance happened @@ -93,7 +93,7 @@ func TestHealthyReweight(t *testing.T) { mock0 := connection0.(*clientMock) require.Equal(t, names[0], mock0.name) - updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) + p.updateInnerNodesHealth(context.TODO(), 0, buffer) connection1, _, err := p.Connection() require.NoError(t, err) @@ -105,7 +105,7 @@ func TestHealthyReweight(t *testing.T) { inner.clientPacks[0].client = newNetmapMock(names[0], false) inner.lock.Unlock() - updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) + p.updateInnerNodesHealth(context.TODO(), 0, buffer) inner.sampler = newSampler(weights, rand.NewSource(0)) connection0, _, err = p.Connection() @@ -118,7 +118,6 @@ func TestHealthyNoReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} names = []string{"node0", "node1"} - options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}} buffer = make([]float64, len(weights)) ) @@ -130,10 +129,11 @@ func TestHealthyNoReweight(t *testing.T) { {client: newNetmapMock(names[1], false), healthy: true}}, } p := &Pool{ - innerPools: []*innerPool{inner}, + innerPools: []*innerPool{inner}, + rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}, } - updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) + p.updateInnerNodesHealth(context.TODO(), 0, buffer) inner.lock.RLock() defer inner.lock.RUnlock()