From 8cce85337b157c0fbafa08ef5e0650f984da0817 Mon Sep 17 00:00:00 2001 From: Alexander Chuprov Date: Wed, 18 Dec 2024 18:38:09 +0300 Subject: [PATCH] [#300] pool: Move 'connectionManager' to separate file Signed-off-by: Alexander Chuprov --- pool/connection_manager.go | 555 +++++++++++++++++++++++++++++++++++++ pool/pool.go | 538 ----------------------------------- 2 files changed, 555 insertions(+), 538 deletions(-) create mode 100644 pool/connection_manager.go diff --git a/pool/connection_manager.go b/pool/connection_manager.go new file mode 100644 index 0000000..66c07c4 --- /dev/null +++ b/pool/connection_manager.go @@ -0,0 +1,555 @@ +package pool + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "math" + "math/rand" + "sort" + "sync" + "sync/atomic" + "time" + + sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" + "github.com/google/uuid" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type innerPool struct { + lock sync.RWMutex + sampler *sampler + clients []client +} + +type connectionManager struct { + innerPools []*innerPool + key *ecdsa.PrivateKey + cache *sessionCache + stokenDuration uint64 + rebalanceParams rebalanceParameters + clientBuilder clientBuilder + logger *zap.Logger + healthChecker *healthCheck +} + +// newConnectionManager creates connection pool using parameters. +func newConnectionManager(options InitParameters) (*connectionManager, error) { + if options.key == nil { + return nil, fmt.Errorf("missed required parameter 'Key'") + } + + nodesParams, err := adjustNodeParams(options.nodeParams) + if err != nil { + return nil, err + } + + cache, err := newCache(options.sessionExpirationDuration) + if err != nil { + return nil, fmt.Errorf("couldn't create cache: %w", err) + } + + fillDefaultInitParams(&options, cache) + + manager := &connectionManager{ + key: options.key, + cache: cache, + logger: options.logger, + stokenDuration: options.sessionExpirationDuration, + rebalanceParams: rebalanceParameters{ + nodesParams: nodesParams, + nodeRequestTimeout: options.healthcheckTimeout, + clientRebalanceInterval: options.clientRebalanceInterval, + sessionExpirationDuration: options.sessionExpirationDuration, + }, + clientBuilder: options.clientBuilder, + } + + return manager, nil +} + +func (cm *connectionManager) dial(ctx context.Context) error { + inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams)) + var atLeastOneHealthy bool + + for i, params := range cm.rebalanceParams.nodesParams { + clients := make([]client, len(params.weights)) + for j, addr := range params.addresses { + clients[j] = cm.clientBuilder(addr) + if err := clients[j].dial(ctx); err != nil { + cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) + continue + } + + var st session.Object + err := initSessionForDuration(ctx, &st, clients[j], cm.rebalanceParams.sessionExpirationDuration, *cm.key, false) + if err != nil { + clients[j].setUnhealthy() + cm.log(zap.WarnLevel, "failed to create frostfs session token for client", + zap.String("address", addr), zap.Error(err)) + continue + } + + _ = cm.cache.Put(formCacheKey(addr, cm.key, false), st) + atLeastOneHealthy = true + } + source := rand.NewSource(time.Now().UnixNano()) + sampl := newSampler(params.weights, source) + + inner[i] = &innerPool{ + sampler: sampl, + clients: clients, + } + } + + if !atLeastOneHealthy { + return fmt.Errorf("at least one node must be healthy") + } + + cm.innerPools = inner + cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval) + cm.healthChecker.startRebalance(ctx, cm.updateNodesHealth, cm.rebalanceParams.nodesParams) + return nil +} + +func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) { + if cm.logger == nil { + return + } + + cm.logger.Log(level, msg, fields...) +} + +func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { + if params.sessionExpirationDuration == 0 { + params.sessionExpirationDuration = defaultSessionTokenExpirationDuration + } + + if params.errorThreshold == 0 { + params.errorThreshold = defaultErrorThreshold + } + + if params.clientRebalanceInterval <= 0 { + params.clientRebalanceInterval = defaultRebalanceInterval + } + + if params.gracefulCloseOnSwitchTimeout <= 0 { + params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout + } + + if params.healthcheckTimeout <= 0 { + params.healthcheckTimeout = defaultHealthcheckTimeout + } + + if params.nodeDialTimeout <= 0 { + params.nodeDialTimeout = defaultDialTimeout + } + + if params.nodeStreamTimeout <= 0 { + params.nodeStreamTimeout = defaultStreamTimeout + } + + if cache.tokenDuration == 0 { + cache.tokenDuration = defaultSessionTokenExpirationDuration + } + + if params.isMissingClientBuilder() { + params.setClientBuilder(func(addr string) client { + var prm wrapperPrm + prm.setAddress(addr) + prm.setKey(*params.key) + prm.setLogger(params.logger) + prm.setDialTimeout(params.nodeDialTimeout) + prm.setStreamTimeout(params.nodeStreamTimeout) + prm.setErrorThreshold(params.errorThreshold) + prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout) + prm.setPoolRequestCallback(params.requestCallback) + prm.setGRPCDialOptions(params.dialOptions) + prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { + cache.updateEpoch(info.Epoch()) + return nil + }) + return newWrapper(prm) + }) + } +} + +func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { + if len(nodeParams) == 0 { + return nil, errors.New("no FrostFS peers configured") + } + + nodesParamsMap := make(map[int]*nodesParam) + for _, param := range nodeParams { + nodes, ok := nodesParamsMap[param.priority] + if !ok { + nodes = &nodesParam{priority: param.priority} + } + nodes.addresses = append(nodes.addresses, param.address) + nodes.weights = append(nodes.weights, param.weight) + nodesParamsMap[param.priority] = nodes + } + + nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) + for _, nodes := range nodesParamsMap { + nodes.weights = adjustWeights(nodes.weights) + nodesParams = append(nodesParams, nodes) + } + + sort.Slice(nodesParams, func(i, j int) bool { + return nodesParams[i].priority < nodesParams[j].priority + }) + + return nodesParams, nil +} + +func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) { + wg := sync.WaitGroup{} + for i, inner := range cm.innerPools { + wg.Add(1) + + bufferWeights := buffers[i] + go func(i int, _ *innerPool) { + defer wg.Done() + cm.updateInnerNodesHealth(ctx, i, bufferWeights) + }(i, inner) + } + wg.Wait() +} + +func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { + if i > len(cm.innerPools)-1 { + return + } + pool := cm.innerPools[i] + options := cm.rebalanceParams + + healthyChanged := new(atomic.Bool) + wg := sync.WaitGroup{} + + for j, cli := range pool.clients { + wg.Add(1) + go func(j int, cli client) { + defer wg.Done() + + tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) + defer c() + + changed, err := restartIfUnhealthy(tctx, cli) + healthy := err == nil + if healthy { + bufferWeights[j] = options.nodesParams[i].weights[j] + } else { + bufferWeights[j] = 0 + cm.cache.DeleteByPrefix(cli.address()) + } + + if changed { + fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)} + if err != nil { + fields = append(fields, zap.String("reason", err.Error())) + } + + cm.log(zap.DebugLevel, "health has changed", fields...) + healthyChanged.Store(true) + } + }(j, cli) + } + wg.Wait() + + if healthyChanged.Load() { + probabilities := adjustWeights(bufferWeights) + source := rand.NewSource(time.Now().UnixNano()) + pool.lock.Lock() + pool.sampler = newSampler(probabilities, source) + pool.lock.Unlock() + } +} + +// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. +// Indicating if status was changed by this function call and returns error that caused unhealthy status. +func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) { + defer func() { + if err != nil { + c.setUnhealthy() + } else { + c.setHealthy() + } + }() + + wasHealthy := c.isHealthy() + + if res, err := c.healthcheck(ctx); err == nil { + if res.Status().IsMaintenance() { + return wasHealthy, new(apistatus.NodeUnderMaintenance) + } + + return !wasHealthy, nil + } + + if err = c.restart(ctx); err != nil { + return wasHealthy, err + } + + res, err := c.healthcheck(ctx) + if err != nil { + return wasHealthy, err + } + + if res.Status().IsMaintenance() { + return wasHealthy, new(apistatus.NodeUnderMaintenance) + } + + return !wasHealthy, nil +} + +func adjustWeights(weights []float64) []float64 { + adjusted := make([]float64, len(weights)) + sum := 0.0 + for _, weight := range weights { + sum += weight + } + if sum > 0 { + for i, weight := range weights { + adjusted[i] = weight / sum + } + } + + return adjusted +} + +func (cm *connectionManager) connection() (client, error) { + for _, inner := range cm.innerPools { + cp, err := inner.connection() + if err == nil { + return cp, nil + } + } + + return nil, errors.New("no healthy client") +} + +func (p *innerPool) connection() (client, error) { + p.lock.RLock() // need lock because of using p.sampler + defer p.lock.RUnlock() + if len(p.clients) == 1 { + cp := p.clients[0] + if cp.isHealthy() { + return cp, nil + } + return nil, errors.New("no healthy client") + } + attempts := 3 * len(p.clients) + for range attempts { + i := p.sampler.Next() + if cp := p.clients[i]; cp.isHealthy() { + return cp, nil + } + } + + return nil, errors.New("no healthy client") +} + +func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { + k := keys.PrivateKey{PrivateKey: *key} + + stype := "server" + if clientCut { + stype = "client" + } + + return address + stype + k.String() +} + +func (cm *connectionManager) checkSessionTokenErr(err error, address string) bool { + if err == nil { + return false + } + + if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) { + cm.cache.DeleteByPrefix(address) + return true + } + + return false +} + +func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error { + ni, err := c.networkInfo(ctx, prmNetworkInfo{}) + if err != nil { + return err + } + + epoch := ni.CurrentEpoch() + + var exp uint64 + if math.MaxUint64-epoch < dur { + exp = math.MaxUint64 + } else { + exp = epoch + dur + } + var prm prmCreateSession + prm.setExp(exp) + prm.useKey(ownerKey) + + var ( + id uuid.UUID + key frostfsecdsa.PublicKey + ) + + if clientCut { + id = uuid.New() + key = frostfsecdsa.PublicKey(ownerKey.PublicKey) + } else { + res, err := c.sessionCreate(ctx, prm) + if err != nil { + return err + } + if err = id.UnmarshalBinary(res.id); err != nil { + return fmt.Errorf("invalid session token ID: %w", err) + } + if err = key.Decode(res.sessionKey); err != nil { + return fmt.Errorf("invalid public session key: %w", err) + } + } + + dst.SetID(id) + dst.SetAuthKey(&key) + dst.SetExp(exp) + + return nil +} + +func (cm *connectionManager) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { + cp, err := cm.connection() + if err != nil { + return err + } + + ctx.key = cfg.key + if ctx.key == nil { + // use pool key if caller didn't specify its own + ctx.key = cm.key + } + + ctx.endpoint = cp.address() + ctx.client = cp + + if ctx.sessionTarget != nil && cfg.stoken != nil { + ctx.sessionTarget(*cfg.stoken) + } + + // note that we don't override session provided by the caller + ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession + if ctx.sessionDefault { + ctx.sessionVerb = prmCtx.verb + ctx.sessionCnr = prmCtx.cnr + ctx.sessionObjSet = prmCtx.objSet + ctx.sessionObjs = prmCtx.objs + } + + return err +} + +// opens new session or uses cached one. +// Must be called only on initialized callContext with set sessionTarget. +func (cm *connectionManager) openDefaultSession(ctx context.Context, cc *callContext) error { + cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) + + tok, ok := cm.cache.Get(cacheKey) + if !ok { + // init new session + err := initSessionForDuration(ctx, &tok, cc.client, cm.stokenDuration, *cc.key, cc.sessionClientCut) + if err != nil { + return fmt.Errorf("session API client: %w", err) + } + + // cache the opened session + cm.cache.Put(cacheKey, tok) + } + + tok.ForVerb(cc.sessionVerb) + tok.BindContainer(cc.sessionCnr) + + if cc.sessionObjSet { + tok.LimitByObjects(cc.sessionObjs...) + } + + // sign the token + if err := tok.Sign(*cc.key); err != nil { + return fmt.Errorf("sign token of the opened session: %w", err) + } + + cc.sessionTarget(tok) + + return nil +} + +// opens default session (if sessionDefault is set), and calls f. If f returns +// session-related error then cached token is removed. +func (cm *connectionManager) call(ctx context.Context, cc *callContext, f func() error) error { + var err error + + if cc.sessionDefault { + err = cm.openDefaultSession(ctx, cc) + if err != nil { + return fmt.Errorf("open default session: %w", err) + } + } + + err = f() + _ = cm.checkSessionTokenErr(err, cc.endpoint) + + return err +} + +// fillAppropriateKey use pool key if caller didn't specify its own. +func (cm *connectionManager) fillAppropriateKey(prm *prmCommon) { + if prm.key == nil { + prm.key = cm.key + } +} + +func (cm connectionManager) Statistic() Statistic { + stat := Statistic{} + for _, inner := range cm.innerPools { + nodes := make([]string, 0, len(inner.clients)) + inner.lock.RLock() + for _, cl := range inner.clients { + if cl.isHealthy() { + nodes = append(nodes, cl.address()) + } + node := NodeStatistic{ + address: cl.address(), + methods: cl.methodsStatus(), + overallErrors: cl.overallErrorRate(), + currentErrors: cl.currentErrorRate(), + } + stat.nodes = append(stat.nodes, node) + stat.overallErrors += node.overallErrors + } + inner.lock.RUnlock() + if len(stat.currentNodes) == 0 { + stat.currentNodes = nodes + } + } + + return stat +} + +func (cm *connectionManager) close() { + cm.healthChecker.stopRebalance() + + // close all clients + for _, pools := range cm.innerPools { + for _, cli := range pools.clients { + _ = cli.close() + } + } +} diff --git a/pool/pool.go b/pool/pool.go index a9c09d1..6a062fa 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -7,9 +7,6 @@ import ( "errors" "fmt" "io" - "math" - "math/rand" - "sort" "sync" "sync/atomic" "time" @@ -21,15 +18,12 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" - "github.com/google/uuid" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -1940,23 +1934,6 @@ type Pool struct { maxObjectSize uint64 } -type connectionManager struct { - innerPools []*innerPool - key *ecdsa.PrivateKey - cache *sessionCache - stokenDuration uint64 - rebalanceParams rebalanceParameters - clientBuilder clientBuilder - logger *zap.Logger - healthChecker *healthCheck -} - -type innerPool struct { - lock sync.RWMutex - sampler *sampler - clients []client -} - const ( defaultSessionTokenExpirationDuration = 100 // in epochs defaultErrorThreshold = 100 @@ -1970,41 +1947,6 @@ const ( defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB ) -// newConnectionManager creates connection pool using parameters. -func newConnectionManager(options InitParameters) (*connectionManager, error) { - if options.key == nil { - return nil, fmt.Errorf("missed required parameter 'Key'") - } - - nodesParams, err := adjustNodeParams(options.nodeParams) - if err != nil { - return nil, err - } - - cache, err := newCache(options.sessionExpirationDuration) - if err != nil { - return nil, fmt.Errorf("couldn't create cache: %w", err) - } - - fillDefaultInitParams(&options, cache) - - manager := &connectionManager{ - key: options.key, - cache: cache, - logger: options.logger, - stokenDuration: options.sessionExpirationDuration, - rebalanceParams: rebalanceParameters{ - nodesParams: nodesParams, - nodeRequestTimeout: options.healthcheckTimeout, - clientRebalanceInterval: options.clientRebalanceInterval, - sessionExpirationDuration: options.sessionExpirationDuration, - }, - clientBuilder: options.clientBuilder, - } - - return manager, nil -} - // NewPool creates cnnectionManager using parameters. func NewPool(options InitParameters) (*Pool, error) { manager, err := newConnectionManager(options) @@ -2042,358 +1984,6 @@ func (p *Pool) Dial(ctx context.Context) error { return nil } -func (cm *connectionManager) dial(ctx context.Context) error { - inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams)) - var atLeastOneHealthy bool - - for i, params := range cm.rebalanceParams.nodesParams { - clients := make([]client, len(params.weights)) - for j, addr := range params.addresses { - clients[j] = cm.clientBuilder(addr) - if err := clients[j].dial(ctx); err != nil { - cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) - continue - } - - var st session.Object - err := initSessionForDuration(ctx, &st, clients[j], cm.rebalanceParams.sessionExpirationDuration, *cm.key, false) - if err != nil { - clients[j].setUnhealthy() - cm.log(zap.WarnLevel, "failed to create frostfs session token for client", - zap.String("address", addr), zap.Error(err)) - continue - } - - _ = cm.cache.Put(formCacheKey(addr, cm.key, false), st) - atLeastOneHealthy = true - } - source := rand.NewSource(time.Now().UnixNano()) - sampl := newSampler(params.weights, source) - - inner[i] = &innerPool{ - sampler: sampl, - clients: clients, - } - } - - if !atLeastOneHealthy { - return fmt.Errorf("at least one node must be healthy") - } - - cm.innerPools = inner - cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval) - cm.healthChecker.startRebalance(ctx, cm.updateNodesHealth, cm.rebalanceParams.nodesParams) - return nil -} - -func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) { - if cm.logger == nil { - return - } - - cm.logger.Log(level, msg, fields...) -} - -func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { - if params.sessionExpirationDuration == 0 { - params.sessionExpirationDuration = defaultSessionTokenExpirationDuration - } - - if params.errorThreshold == 0 { - params.errorThreshold = defaultErrorThreshold - } - - if params.clientRebalanceInterval <= 0 { - params.clientRebalanceInterval = defaultRebalanceInterval - } - - if params.gracefulCloseOnSwitchTimeout <= 0 { - params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout - } - - if params.healthcheckTimeout <= 0 { - params.healthcheckTimeout = defaultHealthcheckTimeout - } - - if params.nodeDialTimeout <= 0 { - params.nodeDialTimeout = defaultDialTimeout - } - - if params.nodeStreamTimeout <= 0 { - params.nodeStreamTimeout = defaultStreamTimeout - } - - if cache.tokenDuration == 0 { - cache.tokenDuration = defaultSessionTokenExpirationDuration - } - - if params.isMissingClientBuilder() { - params.setClientBuilder(func(addr string) client { - var prm wrapperPrm - prm.setAddress(addr) - prm.setKey(*params.key) - prm.setLogger(params.logger) - prm.setDialTimeout(params.nodeDialTimeout) - prm.setStreamTimeout(params.nodeStreamTimeout) - prm.setErrorThreshold(params.errorThreshold) - prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout) - prm.setPoolRequestCallback(params.requestCallback) - prm.setGRPCDialOptions(params.dialOptions) - prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { - cache.updateEpoch(info.Epoch()) - return nil - }) - return newWrapper(prm) - }) - } -} - -func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { - if len(nodeParams) == 0 { - return nil, errors.New("no FrostFS peers configured") - } - - nodesParamsMap := make(map[int]*nodesParam) - for _, param := range nodeParams { - nodes, ok := nodesParamsMap[param.priority] - if !ok { - nodes = &nodesParam{priority: param.priority} - } - nodes.addresses = append(nodes.addresses, param.address) - nodes.weights = append(nodes.weights, param.weight) - nodesParamsMap[param.priority] = nodes - } - - nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) - for _, nodes := range nodesParamsMap { - nodes.weights = adjustWeights(nodes.weights) - nodesParams = append(nodesParams, nodes) - } - - sort.Slice(nodesParams, func(i, j int) bool { - return nodesParams[i].priority < nodesParams[j].priority - }) - - return nodesParams, nil -} - -func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) { - wg := sync.WaitGroup{} - for i, inner := range cm.innerPools { - wg.Add(1) - - bufferWeights := buffers[i] - go func(i int, _ *innerPool) { - defer wg.Done() - cm.updateInnerNodesHealth(ctx, i, bufferWeights) - }(i, inner) - } - wg.Wait() -} - -func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { - if i > len(cm.innerPools)-1 { - return - } - pool := cm.innerPools[i] - options := cm.rebalanceParams - - healthyChanged := new(atomic.Bool) - wg := sync.WaitGroup{} - - for j, cli := range pool.clients { - wg.Add(1) - go func(j int, cli client) { - defer wg.Done() - - tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) - defer c() - - changed, err := restartIfUnhealthy(tctx, cli) - healthy := err == nil - if healthy { - bufferWeights[j] = options.nodesParams[i].weights[j] - } else { - bufferWeights[j] = 0 - cm.cache.DeleteByPrefix(cli.address()) - } - - if changed { - fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)} - if err != nil { - fields = append(fields, zap.String("reason", err.Error())) - } - - cm.log(zap.DebugLevel, "health has changed", fields...) - healthyChanged.Store(true) - } - }(j, cli) - } - wg.Wait() - - if healthyChanged.Load() { - probabilities := adjustWeights(bufferWeights) - source := rand.NewSource(time.Now().UnixNano()) - pool.lock.Lock() - pool.sampler = newSampler(probabilities, source) - pool.lock.Unlock() - } -} - -// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. -// Indicating if status was changed by this function call and returns error that caused unhealthy status. -func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) { - defer func() { - if err != nil { - c.setUnhealthy() - } else { - c.setHealthy() - } - }() - - wasHealthy := c.isHealthy() - - if res, err := c.healthcheck(ctx); err == nil { - if res.Status().IsMaintenance() { - return wasHealthy, new(apistatus.NodeUnderMaintenance) - } - - return !wasHealthy, nil - } - - if err = c.restart(ctx); err != nil { - return wasHealthy, err - } - - res, err := c.healthcheck(ctx) - if err != nil { - return wasHealthy, err - } - - if res.Status().IsMaintenance() { - return wasHealthy, new(apistatus.NodeUnderMaintenance) - } - - return !wasHealthy, nil -} - -func adjustWeights(weights []float64) []float64 { - adjusted := make([]float64, len(weights)) - sum := 0.0 - for _, weight := range weights { - sum += weight - } - if sum > 0 { - for i, weight := range weights { - adjusted[i] = weight / sum - } - } - - return adjusted -} - -func (cm *connectionManager) connection() (client, error) { - for _, inner := range cm.innerPools { - cp, err := inner.connection() - if err == nil { - return cp, nil - } - } - - return nil, errors.New("no healthy client") -} - -func (p *innerPool) connection() (client, error) { - p.lock.RLock() // need lock because of using p.sampler - defer p.lock.RUnlock() - if len(p.clients) == 1 { - cp := p.clients[0] - if cp.isHealthy() { - return cp, nil - } - return nil, errors.New("no healthy client") - } - attempts := 3 * len(p.clients) - for range attempts { - i := p.sampler.Next() - if cp := p.clients[i]; cp.isHealthy() { - return cp, nil - } - } - - return nil, errors.New("no healthy client") -} - -func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { - k := keys.PrivateKey{PrivateKey: *key} - - stype := "server" - if clientCut { - stype = "client" - } - - return address + stype + k.String() -} - -func (cm *connectionManager) checkSessionTokenErr(err error, address string) bool { - if err == nil { - return false - } - - if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) { - cm.cache.DeleteByPrefix(address) - return true - } - - return false -} - -func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error { - ni, err := c.networkInfo(ctx, prmNetworkInfo{}) - if err != nil { - return err - } - - epoch := ni.CurrentEpoch() - - var exp uint64 - if math.MaxUint64-epoch < dur { - exp = math.MaxUint64 - } else { - exp = epoch + dur - } - var prm prmCreateSession - prm.setExp(exp) - prm.useKey(ownerKey) - - var ( - id uuid.UUID - key frostfsecdsa.PublicKey - ) - - if clientCut { - id = uuid.New() - key = frostfsecdsa.PublicKey(ownerKey.PublicKey) - } else { - res, err := c.sessionCreate(ctx, prm) - if err != nil { - return err - } - if err = id.UnmarshalBinary(res.id); err != nil { - return fmt.Errorf("invalid session token ID: %w", err) - } - if err = key.Decode(res.sessionKey); err != nil { - return fmt.Errorf("invalid public session key: %w", err) - } - } - - dst.SetID(id) - dst.SetAuthKey(&key) - dst.SetExp(exp) - - return nil -} - type callContext struct { client client @@ -2414,96 +2004,6 @@ type callContext struct { sessionClientCut bool } -func (cm *connectionManager) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { - cp, err := cm.connection() - if err != nil { - return err - } - - ctx.key = cfg.key - if ctx.key == nil { - // use pool key if caller didn't specify its own - ctx.key = cm.key - } - - ctx.endpoint = cp.address() - ctx.client = cp - - if ctx.sessionTarget != nil && cfg.stoken != nil { - ctx.sessionTarget(*cfg.stoken) - } - - // note that we don't override session provided by the caller - ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession - if ctx.sessionDefault { - ctx.sessionVerb = prmCtx.verb - ctx.sessionCnr = prmCtx.cnr - ctx.sessionObjSet = prmCtx.objSet - ctx.sessionObjs = prmCtx.objs - } - - return err -} - -// opens new session or uses cached one. -// Must be called only on initialized callContext with set sessionTarget. -func (cm *connectionManager) openDefaultSession(ctx context.Context, cc *callContext) error { - cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) - - tok, ok := cm.cache.Get(cacheKey) - if !ok { - // init new session - err := initSessionForDuration(ctx, &tok, cc.client, cm.stokenDuration, *cc.key, cc.sessionClientCut) - if err != nil { - return fmt.Errorf("session API client: %w", err) - } - - // cache the opened session - cm.cache.Put(cacheKey, tok) - } - - tok.ForVerb(cc.sessionVerb) - tok.BindContainer(cc.sessionCnr) - - if cc.sessionObjSet { - tok.LimitByObjects(cc.sessionObjs...) - } - - // sign the token - if err := tok.Sign(*cc.key); err != nil { - return fmt.Errorf("sign token of the opened session: %w", err) - } - - cc.sessionTarget(tok) - - return nil -} - -// opens default session (if sessionDefault is set), and calls f. If f returns -// session-related error then cached token is removed. -func (cm *connectionManager) call(ctx context.Context, cc *callContext, f func() error) error { - var err error - - if cc.sessionDefault { - err = cm.openDefaultSession(ctx, cc) - if err != nil { - return fmt.Errorf("open default session: %w", err) - } - } - - err = f() - _ = cm.checkSessionTokenErr(err, cc.endpoint) - - return err -} - -// fillAppropriateKey use pool key if caller didn't specify its own. -func (cm *connectionManager) fillAppropriateKey(prm *prmCommon) { - if prm.key == nil { - prm.key = cm.key - } -} - // ResPutObject is designed to provide identifier and creation epoch of the saved object. type ResPutObject struct { ObjectID oid.ID @@ -2978,33 +2478,6 @@ func (p Pool) Statistic() Statistic { return p.manager.Statistic() } -func (cm connectionManager) Statistic() Statistic { - stat := Statistic{} - for _, inner := range cm.innerPools { - nodes := make([]string, 0, len(inner.clients)) - inner.lock.RLock() - for _, cl := range inner.clients { - if cl.isHealthy() { - nodes = append(nodes, cl.address()) - } - node := NodeStatistic{ - address: cl.address(), - methods: cl.methodsStatus(), - overallErrors: cl.overallErrorRate(), - currentErrors: cl.currentErrorRate(), - } - stat.nodes = append(stat.nodes, node) - stat.overallErrors += node.overallErrors - } - inner.lock.RUnlock() - if len(stat.currentNodes) == 0 { - stat.currentNodes = nodes - } - } - - return stat -} - // waitForContainerPresence waits until the container is found on the FrostFS network. func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { @@ -3083,17 +2556,6 @@ func (p *Pool) Close() { p.manager.close() } -func (cm *connectionManager) close() { - cm.healthChecker.stopRebalance() - - // close all clients - for _, pools := range cm.innerPools { - for _, cli := range pools.clients { - _ = cli.close() - } - } -} - // SyncContainerWithNetwork applies network configuration received via // the Pool to the container. Changes the container if it does not satisfy // network configuration.