diff --git a/pool/connection_manager.go b/pool/connection_manager.go new file mode 100644 index 00000000..5b9a08de --- /dev/null +++ b/pool/connection_manager.go @@ -0,0 +1,332 @@ +package pool + +import ( + "context" + "errors" + "fmt" + "math/rand" + "sort" + "sync" + "sync/atomic" + "time" + + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type innerPool struct { + lock sync.RWMutex + sampler *sampler + clients []client +} + +type connectionManager struct { + innerPools []*innerPool + rebalanceParams rebalanceParameters + clientBuilder clientBuilder + logger *zap.Logger + healthChecker *healthCheck +} + +// newConnectionManager returns an instance of connectionManager configured according to the parameters. +// +// Before using connectionManager, you MUST call Dial. +func newConnectionManager(options InitParameters) (*connectionManager, error) { + if options.key == nil { + return nil, fmt.Errorf("missed required parameter 'Key'") + } + + nodesParams, err := adjustNodeParams(options.nodeParams) + if err != nil { + return nil, err + } + + manager := &connectionManager{ + logger: options.logger, + rebalanceParams: rebalanceParameters{ + nodesParams: nodesParams, + nodeRequestTimeout: options.healthcheckTimeout, + clientRebalanceInterval: options.clientRebalanceInterval, + sessionExpirationDuration: options.sessionExpirationDuration, + }, + clientBuilder: options.clientBuilder, + } + + return manager, nil +} + +func (cm *connectionManager) dial(ctx context.Context) error { + inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams)) + var atLeastOneHealthy bool + + for i, params := range cm.rebalanceParams.nodesParams { + clients := make([]client, len(params.weights)) + for j, addr := range params.addresses { + clients[j] = cm.clientBuilder(addr) + if err := clients[j].dial(ctx); err != nil { + cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) + continue + } + atLeastOneHealthy = true + } + source := rand.NewSource(time.Now().UnixNano()) + sampl := newSampler(params.weights, source) + + inner[i] = &innerPool{ + sampler: sampl, + clients: clients, + } + } + + if !atLeastOneHealthy { + return fmt.Errorf("at least one node must be healthy") + } + + cm.innerPools = inner + + cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval) + cm.healthChecker.startRebalance(ctx, cm.rebalance) + return nil +} + +func (cm *connectionManager) rebalance(ctx context.Context) { + buffers := make([][]float64, len(cm.rebalanceParams.nodesParams)) + for i, params := range cm.rebalanceParams.nodesParams { + buffers[i] = make([]float64, len(params.weights)) + } + + cm.updateNodesHealth(ctx, buffers) +} + +func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) { + if cm.logger == nil { + return + } + + cm.logger.Log(level, msg, fields...) +} + +func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { + if len(nodeParams) == 0 { + return nil, errors.New("no FrostFS peers configured") + } + + nodesParamsMap := make(map[int]*nodesParam) + for _, param := range nodeParams { + nodes, ok := nodesParamsMap[param.priority] + if !ok { + nodes = &nodesParam{priority: param.priority} + } + nodes.addresses = append(nodes.addresses, param.address) + nodes.weights = append(nodes.weights, param.weight) + nodesParamsMap[param.priority] = nodes + } + + nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) + for _, nodes := range nodesParamsMap { + nodes.weights = adjustWeights(nodes.weights) + nodesParams = append(nodesParams, nodes) + } + + sort.Slice(nodesParams, func(i, j int) bool { + return nodesParams[i].priority < nodesParams[j].priority + }) + + return nodesParams, nil +} + +func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) { + wg := sync.WaitGroup{} + for i, inner := range cm.innerPools { + wg.Add(1) + + bufferWeights := buffers[i] + go func(i int, _ *innerPool) { + defer wg.Done() + cm.updateInnerNodesHealth(ctx, i, bufferWeights) + }(i, inner) + } + wg.Wait() +} + +func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { + if i > len(cm.innerPools)-1 { + return + } + pool := cm.innerPools[i] + options := cm.rebalanceParams + + healthyChanged := new(atomic.Bool) + wg := sync.WaitGroup{} + + for j, cli := range pool.clients { + wg.Add(1) + go func(j int, cli client) { + defer wg.Done() + + tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) + defer c() + + changed, err := restartIfUnhealthy(tctx, cli) + healthy := err == nil + if healthy { + bufferWeights[j] = options.nodesParams[i].weights[j] + } else { + bufferWeights[j] = 0 + } + + if changed { + fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)} + if err != nil { + fields = append(fields, zap.String("reason", err.Error())) + } + + cm.log(zap.DebugLevel, "health has changed", fields...) + healthyChanged.Store(true) + } + }(j, cli) + } + wg.Wait() + + if healthyChanged.Load() { + probabilities := adjustWeights(bufferWeights) + source := rand.NewSource(time.Now().UnixNano()) + pool.lock.Lock() + pool.sampler = newSampler(probabilities, source) + pool.lock.Unlock() + } +} + +// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. +// Indicating if status was changed by this function call and returns error that caused unhealthy status. +func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) { + defer func() { + if err != nil { + c.setUnhealthy() + } else { + c.setHealthy() + } + }() + + wasHealthy := c.isHealthy() + + if res, err := c.healthcheck(ctx); err == nil { + if res.Status().IsMaintenance() { + return wasHealthy, new(apistatus.NodeUnderMaintenance) + } + + return !wasHealthy, nil + } + + if err = c.restart(ctx); err != nil { + return wasHealthy, err + } + + res, err := c.healthcheck(ctx) + if err != nil { + return wasHealthy, err + } + + if res.Status().IsMaintenance() { + return wasHealthy, new(apistatus.NodeUnderMaintenance) + } + + return !wasHealthy, nil +} + +func adjustWeights(weights []float64) []float64 { + adjusted := make([]float64, len(weights)) + sum := 0.0 + for _, weight := range weights { + sum += weight + } + if sum > 0 { + for i, weight := range weights { + adjusted[i] = weight / sum + } + } + + return adjusted +} + +func (cm *connectionManager) connection() (client, error) { + for _, inner := range cm.innerPools { + cp, err := inner.connection() + if err == nil { + return cp, nil + } + } + + return nil, errors.New("no healthy client") +} + +// iterate iterates over all clients in all innerPools. +func (cm *connectionManager) iterate(cb func(client)) { + for _, inner := range cm.innerPools { + for _, cl := range inner.clients { + if cl.isHealthy() { + cb(cl) + } + } + } +} + +func (p *innerPool) connection() (client, error) { + p.lock.RLock() // need lock because of using p.sampler + defer p.lock.RUnlock() + if len(p.clients) == 1 { + cp := p.clients[0] + if cp.isHealthy() { + return cp, nil + } + return nil, errors.New("no healthy client") + } + attempts := 3 * len(p.clients) + for range attempts { + i := p.sampler.Next() + if cp := p.clients[i]; cp.isHealthy() { + return cp, nil + } + } + + return nil, errors.New("no healthy client") +} + +func (cm connectionManager) Statistic() Statistic { + stat := Statistic{} + for _, inner := range cm.innerPools { + nodes := make([]string, 0, len(inner.clients)) + inner.lock.RLock() + for _, cl := range inner.clients { + if cl.isHealthy() { + nodes = append(nodes, cl.address()) + } + node := NodeStatistic{ + address: cl.address(), + methods: cl.methodsStatus(), + overallErrors: cl.overallErrorRate(), + currentErrors: cl.currentErrorRate(), + } + stat.nodes = append(stat.nodes, node) + stat.overallErrors += node.overallErrors + } + inner.lock.RUnlock() + if len(stat.currentNodes) == 0 { + stat.currentNodes = nodes + } + } + + return stat +} + +func (cm *connectionManager) close() { + cm.healthChecker.stopRebalance() + + // close all clients + for _, pools := range cm.innerPools { + for _, cli := range pools.clients { + _ = cli.close() + } + } +} diff --git a/pool/pool.go b/pool/pool.go index 8810c159..bbbebbae 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -8,8 +8,6 @@ import ( "fmt" "io" "math" - "math/rand" - "sort" "sync" "sync/atomic" "time" @@ -2015,23 +2013,6 @@ type Pool struct { maxObjectSize uint64 } -type connectionManager struct { - innerPools []*innerPool - - cancel context.CancelFunc - closedCh chan struct{} - rebalanceParams rebalanceParameters - clientBuilder clientBuilder - logger *zap.Logger - healthChecker *healthCheck -} - -type innerPool struct { - lock sync.RWMutex - sampler *sampler - clients []client -} - const ( defaultSessionTokenExpirationDuration = 100 // in epochs defaultErrorThreshold = 100 @@ -2045,33 +2026,6 @@ const ( defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB ) -// newConnectionManager returns an instance of connectionManager configured according to the parameters. -// -// Before using connectionManager, you MUST call Dial. -func newConnectionManager(options InitParameters) (*connectionManager, error) { - if options.key == nil { - return nil, fmt.Errorf("missed required parameter 'Key'") - } - - nodesParams, err := adjustNodeParams(options.nodeParams) - if err != nil { - return nil, err - } - - manager := &connectionManager{ - logger: options.logger, - rebalanceParams: rebalanceParameters{ - nodesParams: nodesParams, - nodeRequestTimeout: options.healthcheckTimeout, - clientRebalanceInterval: options.clientRebalanceInterval, - sessionExpirationDuration: options.sessionExpirationDuration, - }, - clientBuilder: options.clientBuilder, - } - - return manager, nil -} - // NewPool returns an instance of Pool configured according to the parameters. // // Before using Pool, you MUST call Dial. @@ -2141,57 +2095,6 @@ func (p *Pool) Dial(ctx context.Context) error { return nil } -func (cm *connectionManager) dial(ctx context.Context) error { - inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams)) - var atLeastOneHealthy bool - - for i, params := range cm.rebalanceParams.nodesParams { - clients := make([]client, len(params.weights)) - for j, addr := range params.addresses { - clients[j] = cm.clientBuilder(addr) - if err := clients[j].dial(ctx); err != nil { - cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) - continue - } - atLeastOneHealthy = true - } - source := rand.NewSource(time.Now().UnixNano()) - sampl := newSampler(params.weights, source) - - inner[i] = &innerPool{ - sampler: sampl, - clients: clients, - } - } - - if !atLeastOneHealthy { - return fmt.Errorf("at least one node must be healthy") - } - - cm.innerPools = inner - - cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval) - cm.healthChecker.startRebalance(ctx, cm.rebalance) - return nil -} - -func (cm *connectionManager) rebalance(ctx context.Context) { - buffers := make([][]float64, len(cm.rebalanceParams.nodesParams)) - for i, params := range cm.rebalanceParams.nodesParams { - buffers[i] = make([]float64, len(params.weights)) - } - - cm.updateNodesHealth(ctx, buffers) -} - -func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) { - if cm.logger == nil { - return - } - - cm.logger.Log(level, msg, fields...) -} - func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { if params.sessionExpirationDuration == 0 { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration @@ -2246,192 +2149,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { } } -func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { - if len(nodeParams) == 0 { - return nil, errors.New("no FrostFS peers configured") - } - - nodesParamsMap := make(map[int]*nodesParam) - for _, param := range nodeParams { - nodes, ok := nodesParamsMap[param.priority] - if !ok { - nodes = &nodesParam{priority: param.priority} - } - nodes.addresses = append(nodes.addresses, param.address) - nodes.weights = append(nodes.weights, param.weight) - nodesParamsMap[param.priority] = nodes - } - - nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) - for _, nodes := range nodesParamsMap { - nodes.weights = adjustWeights(nodes.weights) - nodesParams = append(nodesParams, nodes) - } - - sort.Slice(nodesParams, func(i, j int) bool { - return nodesParams[i].priority < nodesParams[j].priority - }) - - return nodesParams, nil -} - -func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) { - wg := sync.WaitGroup{} - for i, inner := range cm.innerPools { - wg.Add(1) - - bufferWeights := buffers[i] - go func(i int, _ *innerPool) { - defer wg.Done() - cm.updateInnerNodesHealth(ctx, i, bufferWeights) - }(i, inner) - } - wg.Wait() -} - -func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { - if i > len(cm.innerPools)-1 { - return - } - pool := cm.innerPools[i] - options := cm.rebalanceParams - - healthyChanged := new(atomic.Bool) - wg := sync.WaitGroup{} - - for j, cli := range pool.clients { - wg.Add(1) - go func(j int, cli client) { - defer wg.Done() - - tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) - defer c() - - changed, err := restartIfUnhealthy(tctx, cli) - healthy := err == nil - if healthy { - bufferWeights[j] = options.nodesParams[i].weights[j] - } else { - bufferWeights[j] = 0 - } - - if changed { - fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)} - if err != nil { - fields = append(fields, zap.String("reason", err.Error())) - } - - cm.log(zap.DebugLevel, "health has changed", fields...) - healthyChanged.Store(true) - } - }(j, cli) - } - wg.Wait() - - if healthyChanged.Load() { - probabilities := adjustWeights(bufferWeights) - source := rand.NewSource(time.Now().UnixNano()) - pool.lock.Lock() - pool.sampler = newSampler(probabilities, source) - pool.lock.Unlock() - } -} - -// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. -// Indicating if status was changed by this function call and returns error that caused unhealthy status. -func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) { - defer func() { - if err != nil { - c.setUnhealthy() - } else { - c.setHealthy() - } - }() - - wasHealthy := c.isHealthy() - - if res, err := c.healthcheck(ctx); err == nil { - if res.Status().IsMaintenance() { - return wasHealthy, new(apistatus.NodeUnderMaintenance) - } - - return !wasHealthy, nil - } - - if err = c.restart(ctx); err != nil { - return wasHealthy, err - } - - res, err := c.healthcheck(ctx) - if err != nil { - return wasHealthy, err - } - - if res.Status().IsMaintenance() { - return wasHealthy, new(apistatus.NodeUnderMaintenance) - } - - return !wasHealthy, nil -} - -func adjustWeights(weights []float64) []float64 { - adjusted := make([]float64, len(weights)) - sum := 0.0 - for _, weight := range weights { - sum += weight - } - if sum > 0 { - for i, weight := range weights { - adjusted[i] = weight / sum - } - } - - return adjusted -} - -func (cm *connectionManager) connection() (client, error) { - for _, inner := range cm.innerPools { - cp, err := inner.connection() - if err == nil { - return cp, nil - } - } - - return nil, errors.New("no healthy client") -} - -// iterate iterates over all clients in all innerPools. -func (cm *connectionManager) iterate(cb func(client)) { - for _, inner := range cm.innerPools { - for _, cl := range inner.clients { - if cl.isHealthy() { - cb(cl) - } - } - } -} - -func (p *innerPool) connection() (client, error) { - p.lock.RLock() // need lock because of using p.sampler - defer p.lock.RUnlock() - if len(p.clients) == 1 { - cp := p.clients[0] - if cp.isHealthy() { - return cp, nil - } - return nil, errors.New("no healthy client") - } - attempts := 3 * len(p.clients) - for range attempts { - i := p.sampler.Next() - if cp := p.clients[i]; cp.isHealthy() { - return cp, nil - } - } - - return nil, errors.New("no healthy client") -} - func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { k := keys.PrivateKey{PrivateKey: *key} @@ -3089,33 +2806,6 @@ func (p Pool) Statistic() Statistic { return p.manager.Statistic() } -func (cm connectionManager) Statistic() Statistic { - stat := Statistic{} - for _, inner := range cm.innerPools { - nodes := make([]string, 0, len(inner.clients)) - inner.lock.RLock() - for _, cl := range inner.clients { - if cl.isHealthy() { - nodes = append(nodes, cl.address()) - } - node := NodeStatistic{ - address: cl.address(), - methods: cl.methodsStatus(), - overallErrors: cl.overallErrorRate(), - currentErrors: cl.currentErrorRate(), - } - stat.nodes = append(stat.nodes, node) - stat.overallErrors += node.overallErrors - } - inner.lock.RUnlock() - if len(stat.currentNodes) == 0 { - stat.currentNodes = nodes - } - } - - return stat -} - // waitForContainerPresence waits until the container is found on the FrostFS network. func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { @@ -3194,17 +2884,6 @@ func (p *Pool) Close() { p.manager.close() } -func (cm *connectionManager) close() { - cm.healthChecker.stopRebalance() - - // close all clients - for _, pools := range cm.innerPools { - for _, cli := range pools.clients { - _ = cli.close() - } - } -} - // SyncContainerWithNetwork applies network configuration received via // the Pool to the container. Changes the container if it does not satisfy // network configuration.