package pool import ( "context" "errors" "fmt" "math/rand" "sort" "sync" "sync/atomic" "time" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type innerPool struct { lock sync.RWMutex sampler *sampler clients []client } type connectionManager struct { innerPools []*innerPool rebalanceParams rebalanceParameters clientBuilder clientBuilder logger *zap.Logger healthChecker *healthCheck } // newConnectionManager returns an instance of connectionManager configured according to the parameters. // // Before using connectionManager, you MUST call Dial. func newConnectionManager(options InitParameters) (*connectionManager, error) { if options.key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } nodesParams, err := adjustNodeParams(options.nodeParams) if err != nil { return nil, err } manager := &connectionManager{ logger: options.logger, rebalanceParams: rebalanceParameters{ nodesParams: nodesParams, nodeRequestTimeout: options.healthcheckTimeout, clientRebalanceInterval: options.clientRebalanceInterval, sessionExpirationDuration: options.sessionExpirationDuration, }, clientBuilder: options.clientBuilder, } return manager, nil } func (cm *connectionManager) dial(ctx context.Context) error { inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams)) var atLeastOneHealthy bool for i, params := range cm.rebalanceParams.nodesParams { clients := make([]client, len(params.weights)) for j, addr := range params.addresses { clients[j] = cm.clientBuilder(addr) if err := clients[j].dial(ctx); err != nil { cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) continue } atLeastOneHealthy = true } source := rand.NewSource(time.Now().UnixNano()) sampl := newSampler(params.weights, source) inner[i] = &innerPool{ sampler: sampl, clients: clients, } } if !atLeastOneHealthy { return fmt.Errorf("at least one node must be healthy") } cm.innerPools = inner cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval) cm.healthChecker.startRebalance(ctx, cm.rebalance) return nil } func (cm *connectionManager) rebalance(ctx context.Context) { buffers := make([][]float64, len(cm.rebalanceParams.nodesParams)) for i, params := range cm.rebalanceParams.nodesParams { buffers[i] = make([]float64, len(params.weights)) } cm.updateNodesHealth(ctx, buffers) } func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) { if cm.logger == nil { return } cm.logger.Log(level, msg, fields...) } func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { if len(nodeParams) == 0 { return nil, errors.New("no FrostFS peers configured") } nodesParamsMap := make(map[int]*nodesParam) for _, param := range nodeParams { nodes, ok := nodesParamsMap[param.priority] if !ok { nodes = &nodesParam{priority: param.priority} } nodes.addresses = append(nodes.addresses, param.address) nodes.weights = append(nodes.weights, param.weight) nodesParamsMap[param.priority] = nodes } nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) for _, nodes := range nodesParamsMap { nodes.weights = adjustWeights(nodes.weights) nodesParams = append(nodesParams, nodes) } sort.Slice(nodesParams, func(i, j int) bool { return nodesParams[i].priority < nodesParams[j].priority }) return nodesParams, nil } func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range cm.innerPools { wg.Add(1) bufferWeights := buffers[i] go func(i int, _ *innerPool) { defer wg.Done() cm.updateInnerNodesHealth(ctx, i, bufferWeights) }(i, inner) } wg.Wait() } func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { if i > len(cm.innerPools)-1 { return } pool := cm.innerPools[i] options := cm.rebalanceParams healthyChanged := new(atomic.Bool) wg := sync.WaitGroup{} for j, cli := range pool.clients { wg.Add(1) go func(j int, cli client) { defer wg.Done() tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) defer c() changed, err := restartIfUnhealthy(tctx, cli) healthy := err == nil if healthy { bufferWeights[j] = options.nodesParams[i].weights[j] } else { bufferWeights[j] = 0 } if changed { fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)} if err != nil { fields = append(fields, zap.String("reason", err.Error())) } cm.log(zap.DebugLevel, "health has changed", fields...) healthyChanged.Store(true) } }(j, cli) } wg.Wait() if healthyChanged.Load() { probabilities := adjustWeights(bufferWeights) source := rand.NewSource(time.Now().UnixNano()) pool.lock.Lock() pool.sampler = newSampler(probabilities, source) pool.lock.Unlock() } } // restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. // Indicating if status was changed by this function call and returns error that caused unhealthy status. func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) { defer func() { if err != nil { c.setUnhealthy() } else { c.setHealthy() } }() wasHealthy := c.isHealthy() if res, err := c.healthcheck(ctx); err == nil { if res.Status().IsMaintenance() { return wasHealthy, new(apistatus.NodeUnderMaintenance) } return !wasHealthy, nil } if err = c.restart(ctx); err != nil { return wasHealthy, err } res, err := c.healthcheck(ctx) if err != nil { return wasHealthy, err } if res.Status().IsMaintenance() { return wasHealthy, new(apistatus.NodeUnderMaintenance) } return !wasHealthy, nil } func adjustWeights(weights []float64) []float64 { adjusted := make([]float64, len(weights)) sum := 0.0 for _, weight := range weights { sum += weight } if sum > 0 { for i, weight := range weights { adjusted[i] = weight / sum } } return adjusted } func (cm *connectionManager) connection() (client, error) { for _, inner := range cm.innerPools { cp, err := inner.connection() if err == nil { return cp, nil } } return nil, errors.New("no healthy client") } // iterate iterates over all clients in all innerPools. func (cm *connectionManager) iterate(cb func(client)) { for _, inner := range cm.innerPools { for _, cl := range inner.clients { if cl.isHealthy() { cb(cl) } } } } func (p *innerPool) connection() (client, error) { p.lock.RLock() // need lock because of using p.sampler defer p.lock.RUnlock() if len(p.clients) == 1 { cp := p.clients[0] if cp.isHealthy() { return cp, nil } return nil, errors.New("no healthy client") } attempts := 3 * len(p.clients) for range attempts { i := p.sampler.Next() if cp := p.clients[i]; cp.isHealthy() { return cp, nil } } return nil, errors.New("no healthy client") } func (cm connectionManager) Statistic() Statistic { stat := Statistic{} for _, inner := range cm.innerPools { nodes := make([]string, 0, len(inner.clients)) inner.lock.RLock() for _, cl := range inner.clients { if cl.isHealthy() { nodes = append(nodes, cl.address()) } node := NodeStatistic{ address: cl.address(), methods: cl.methodsStatus(), overallErrors: cl.overallErrorRate(), currentErrors: cl.currentErrorRate(), } stat.nodes = append(stat.nodes, node) stat.overallErrors += node.overallErrors } inner.lock.RUnlock() if len(stat.currentNodes) == 0 { stat.currentNodes = nodes } } return stat } func (cm *connectionManager) close() { cm.healthChecker.stopRebalance() // close all clients for _, pools := range cm.innerPools { for _, cli := range pools.clients { _ = cli.close() } } }