forked from TrueCloudLab/frostfs-sdk-go
[#300] pool: Move 'connectionManager' to separate file
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
parent
42b5aacd8b
commit
8cce85337b
2 changed files with 555 additions and 538 deletions
555
pool/connection_manager.go
Normal file
555
pool/connection_manager.go
Normal file
|
@ -0,0 +1,555 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"github.com/google/uuid"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
type innerPool struct {
|
||||
lock sync.RWMutex
|
||||
sampler *sampler
|
||||
clients []client
|
||||
}
|
||||
|
||||
type connectionManager struct {
|
||||
innerPools []*innerPool
|
||||
key *ecdsa.PrivateKey
|
||||
cache *sessionCache
|
||||
stokenDuration uint64
|
||||
rebalanceParams rebalanceParameters
|
||||
clientBuilder clientBuilder
|
||||
logger *zap.Logger
|
||||
healthChecker *healthCheck
|
||||
}
|
||||
|
||||
// newConnectionManager creates connection pool using parameters.
|
||||
func newConnectionManager(options InitParameters) (*connectionManager, error) {
|
||||
if options.key == nil {
|
||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||
}
|
||||
|
||||
nodesParams, err := adjustNodeParams(options.nodeParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cache, err := newCache(options.sessionExpirationDuration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
||||
}
|
||||
|
||||
fillDefaultInitParams(&options, cache)
|
||||
|
||||
manager := &connectionManager{
|
||||
key: options.key,
|
||||
cache: cache,
|
||||
logger: options.logger,
|
||||
stokenDuration: options.sessionExpirationDuration,
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: nodesParams,
|
||||
nodeRequestTimeout: options.healthcheckTimeout,
|
||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
sessionExpirationDuration: options.sessionExpirationDuration,
|
||||
},
|
||||
clientBuilder: options.clientBuilder,
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) dial(ctx context.Context) error {
|
||||
inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams))
|
||||
var atLeastOneHealthy bool
|
||||
|
||||
for i, params := range cm.rebalanceParams.nodesParams {
|
||||
clients := make([]client, len(params.weights))
|
||||
for j, addr := range params.addresses {
|
||||
clients[j] = cm.clientBuilder(addr)
|
||||
if err := clients[j].dial(ctx); err != nil {
|
||||
cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
var st session.Object
|
||||
err := initSessionForDuration(ctx, &st, clients[j], cm.rebalanceParams.sessionExpirationDuration, *cm.key, false)
|
||||
if err != nil {
|
||||
clients[j].setUnhealthy()
|
||||
cm.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
||||
zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
_ = cm.cache.Put(formCacheKey(addr, cm.key, false), st)
|
||||
atLeastOneHealthy = true
|
||||
}
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
sampl := newSampler(params.weights, source)
|
||||
|
||||
inner[i] = &innerPool{
|
||||
sampler: sampl,
|
||||
clients: clients,
|
||||
}
|
||||
}
|
||||
|
||||
if !atLeastOneHealthy {
|
||||
return fmt.Errorf("at least one node must be healthy")
|
||||
}
|
||||
|
||||
cm.innerPools = inner
|
||||
cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval)
|
||||
cm.healthChecker.startRebalance(ctx, cm.updateNodesHealth, cm.rebalanceParams.nodesParams)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if cm.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cm.logger.Log(level, msg, fields...)
|
||||
}
|
||||
|
||||
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||
if params.sessionExpirationDuration == 0 {
|
||||
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
|
||||
}
|
||||
|
||||
if params.errorThreshold == 0 {
|
||||
params.errorThreshold = defaultErrorThreshold
|
||||
}
|
||||
|
||||
if params.clientRebalanceInterval <= 0 {
|
||||
params.clientRebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
|
||||
if params.gracefulCloseOnSwitchTimeout <= 0 {
|
||||
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
|
||||
}
|
||||
|
||||
if params.healthcheckTimeout <= 0 {
|
||||
params.healthcheckTimeout = defaultHealthcheckTimeout
|
||||
}
|
||||
|
||||
if params.nodeDialTimeout <= 0 {
|
||||
params.nodeDialTimeout = defaultDialTimeout
|
||||
}
|
||||
|
||||
if params.nodeStreamTimeout <= 0 {
|
||||
params.nodeStreamTimeout = defaultStreamTimeout
|
||||
}
|
||||
|
||||
if cache.tokenDuration == 0 {
|
||||
cache.tokenDuration = defaultSessionTokenExpirationDuration
|
||||
}
|
||||
|
||||
if params.isMissingClientBuilder() {
|
||||
params.setClientBuilder(func(addr string) client {
|
||||
var prm wrapperPrm
|
||||
prm.setAddress(addr)
|
||||
prm.setKey(*params.key)
|
||||
prm.setLogger(params.logger)
|
||||
prm.setDialTimeout(params.nodeDialTimeout)
|
||||
prm.setStreamTimeout(params.nodeStreamTimeout)
|
||||
prm.setErrorThreshold(params.errorThreshold)
|
||||
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
|
||||
prm.setPoolRequestCallback(params.requestCallback)
|
||||
prm.setGRPCDialOptions(params.dialOptions)
|
||||
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
||||
cache.updateEpoch(info.Epoch())
|
||||
return nil
|
||||
})
|
||||
return newWrapper(prm)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||
if len(nodeParams) == 0 {
|
||||
return nil, errors.New("no FrostFS peers configured")
|
||||
}
|
||||
|
||||
nodesParamsMap := make(map[int]*nodesParam)
|
||||
for _, param := range nodeParams {
|
||||
nodes, ok := nodesParamsMap[param.priority]
|
||||
if !ok {
|
||||
nodes = &nodesParam{priority: param.priority}
|
||||
}
|
||||
nodes.addresses = append(nodes.addresses, param.address)
|
||||
nodes.weights = append(nodes.weights, param.weight)
|
||||
nodesParamsMap[param.priority] = nodes
|
||||
}
|
||||
|
||||
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
|
||||
for _, nodes := range nodesParamsMap {
|
||||
nodes.weights = adjustWeights(nodes.weights)
|
||||
nodesParams = append(nodesParams, nodes)
|
||||
}
|
||||
|
||||
sort.Slice(nodesParams, func(i, j int) bool {
|
||||
return nodesParams[i].priority < nodesParams[j].priority
|
||||
})
|
||||
|
||||
return nodesParams, nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||
wg := sync.WaitGroup{}
|
||||
for i, inner := range cm.innerPools {
|
||||
wg.Add(1)
|
||||
|
||||
bufferWeights := buffers[i]
|
||||
go func(i int, _ *innerPool) {
|
||||
defer wg.Done()
|
||||
cm.updateInnerNodesHealth(ctx, i, bufferWeights)
|
||||
}(i, inner)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
||||
if i > len(cm.innerPools)-1 {
|
||||
return
|
||||
}
|
||||
pool := cm.innerPools[i]
|
||||
options := cm.rebalanceParams
|
||||
|
||||
healthyChanged := new(atomic.Bool)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for j, cli := range pool.clients {
|
||||
wg.Add(1)
|
||||
go func(j int, cli client) {
|
||||
defer wg.Done()
|
||||
|
||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
||||
defer c()
|
||||
|
||||
changed, err := restartIfUnhealthy(tctx, cli)
|
||||
healthy := err == nil
|
||||
if healthy {
|
||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
||||
} else {
|
||||
bufferWeights[j] = 0
|
||||
cm.cache.DeleteByPrefix(cli.address())
|
||||
}
|
||||
|
||||
if changed {
|
||||
fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)}
|
||||
if err != nil {
|
||||
fields = append(fields, zap.String("reason", err.Error()))
|
||||
}
|
||||
|
||||
cm.log(zap.DebugLevel, "health has changed", fields...)
|
||||
healthyChanged.Store(true)
|
||||
}
|
||||
}(j, cli)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if healthyChanged.Load() {
|
||||
probabilities := adjustWeights(bufferWeights)
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
pool.lock.Lock()
|
||||
pool.sampler = newSampler(probabilities, source)
|
||||
pool.lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
||||
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
|
||||
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.setUnhealthy()
|
||||
} else {
|
||||
c.setHealthy()
|
||||
}
|
||||
}()
|
||||
|
||||
wasHealthy := c.isHealthy()
|
||||
|
||||
if res, err := c.healthcheck(ctx); err == nil {
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
if err = c.restart(ctx); err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
res, err := c.healthcheck(ctx)
|
||||
if err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
func adjustWeights(weights []float64) []float64 {
|
||||
adjusted := make([]float64, len(weights))
|
||||
sum := 0.0
|
||||
for _, weight := range weights {
|
||||
sum += weight
|
||||
}
|
||||
if sum > 0 {
|
||||
for i, weight := range weights {
|
||||
adjusted[i] = weight / sum
|
||||
}
|
||||
}
|
||||
|
||||
return adjusted
|
||||
}
|
||||
|
||||
func (cm *connectionManager) connection() (client, error) {
|
||||
for _, inner := range cm.innerPools {
|
||||
cp, err := inner.connection()
|
||||
if err == nil {
|
||||
return cp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func (p *innerPool) connection() (client, error) {
|
||||
p.lock.RLock() // need lock because of using p.sampler
|
||||
defer p.lock.RUnlock()
|
||||
if len(p.clients) == 1 {
|
||||
cp := p.clients[0]
|
||||
if cp.isHealthy() {
|
||||
return cp, nil
|
||||
}
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
attempts := 3 * len(p.clients)
|
||||
for range attempts {
|
||||
i := p.sampler.Next()
|
||||
if cp := p.clients[i]; cp.isHealthy() {
|
||||
return cp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
|
||||
k := keys.PrivateKey{PrivateKey: *key}
|
||||
|
||||
stype := "server"
|
||||
if clientCut {
|
||||
stype = "client"
|
||||
}
|
||||
|
||||
return address + stype + k.String()
|
||||
}
|
||||
|
||||
func (cm *connectionManager) checkSessionTokenErr(err error, address string) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) {
|
||||
cm.cache.DeleteByPrefix(address)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
|
||||
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
epoch := ni.CurrentEpoch()
|
||||
|
||||
var exp uint64
|
||||
if math.MaxUint64-epoch < dur {
|
||||
exp = math.MaxUint64
|
||||
} else {
|
||||
exp = epoch + dur
|
||||
}
|
||||
var prm prmCreateSession
|
||||
prm.setExp(exp)
|
||||
prm.useKey(ownerKey)
|
||||
|
||||
var (
|
||||
id uuid.UUID
|
||||
key frostfsecdsa.PublicKey
|
||||
)
|
||||
|
||||
if clientCut {
|
||||
id = uuid.New()
|
||||
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
|
||||
} else {
|
||||
res, err := c.sessionCreate(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = id.UnmarshalBinary(res.id); err != nil {
|
||||
return fmt.Errorf("invalid session token ID: %w", err)
|
||||
}
|
||||
if err = key.Decode(res.sessionKey); err != nil {
|
||||
return fmt.Errorf("invalid public session key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
dst.SetID(id)
|
||||
dst.SetAuthKey(&key)
|
||||
dst.SetExp(exp)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||
cp, err := cm.connection()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx.key = cfg.key
|
||||
if ctx.key == nil {
|
||||
// use pool key if caller didn't specify its own
|
||||
ctx.key = cm.key
|
||||
}
|
||||
|
||||
ctx.endpoint = cp.address()
|
||||
ctx.client = cp
|
||||
|
||||
if ctx.sessionTarget != nil && cfg.stoken != nil {
|
||||
ctx.sessionTarget(*cfg.stoken)
|
||||
}
|
||||
|
||||
// note that we don't override session provided by the caller
|
||||
ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
|
||||
if ctx.sessionDefault {
|
||||
ctx.sessionVerb = prmCtx.verb
|
||||
ctx.sessionCnr = prmCtx.cnr
|
||||
ctx.sessionObjSet = prmCtx.objSet
|
||||
ctx.sessionObjs = prmCtx.objs
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// opens new session or uses cached one.
|
||||
// Must be called only on initialized callContext with set sessionTarget.
|
||||
func (cm *connectionManager) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
||||
|
||||
tok, ok := cm.cache.Get(cacheKey)
|
||||
if !ok {
|
||||
// init new session
|
||||
err := initSessionForDuration(ctx, &tok, cc.client, cm.stokenDuration, *cc.key, cc.sessionClientCut)
|
||||
if err != nil {
|
||||
return fmt.Errorf("session API client: %w", err)
|
||||
}
|
||||
|
||||
// cache the opened session
|
||||
cm.cache.Put(cacheKey, tok)
|
||||
}
|
||||
|
||||
tok.ForVerb(cc.sessionVerb)
|
||||
tok.BindContainer(cc.sessionCnr)
|
||||
|
||||
if cc.sessionObjSet {
|
||||
tok.LimitByObjects(cc.sessionObjs...)
|
||||
}
|
||||
|
||||
// sign the token
|
||||
if err := tok.Sign(*cc.key); err != nil {
|
||||
return fmt.Errorf("sign token of the opened session: %w", err)
|
||||
}
|
||||
|
||||
cc.sessionTarget(tok)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// opens default session (if sessionDefault is set), and calls f. If f returns
|
||||
// session-related error then cached token is removed.
|
||||
func (cm *connectionManager) call(ctx context.Context, cc *callContext, f func() error) error {
|
||||
var err error
|
||||
|
||||
if cc.sessionDefault {
|
||||
err = cm.openDefaultSession(ctx, cc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open default session: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = f()
|
||||
_ = cm.checkSessionTokenErr(err, cc.endpoint)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// fillAppropriateKey use pool key if caller didn't specify its own.
|
||||
func (cm *connectionManager) fillAppropriateKey(prm *prmCommon) {
|
||||
if prm.key == nil {
|
||||
prm.key = cm.key
|
||||
}
|
||||
}
|
||||
|
||||
func (cm connectionManager) Statistic() Statistic {
|
||||
stat := Statistic{}
|
||||
for _, inner := range cm.innerPools {
|
||||
nodes := make([]string, 0, len(inner.clients))
|
||||
inner.lock.RLock()
|
||||
for _, cl := range inner.clients {
|
||||
if cl.isHealthy() {
|
||||
nodes = append(nodes, cl.address())
|
||||
}
|
||||
node := NodeStatistic{
|
||||
address: cl.address(),
|
||||
methods: cl.methodsStatus(),
|
||||
overallErrors: cl.overallErrorRate(),
|
||||
currentErrors: cl.currentErrorRate(),
|
||||
}
|
||||
stat.nodes = append(stat.nodes, node)
|
||||
stat.overallErrors += node.overallErrors
|
||||
}
|
||||
inner.lock.RUnlock()
|
||||
if len(stat.currentNodes) == 0 {
|
||||
stat.currentNodes = nodes
|
||||
}
|
||||
}
|
||||
|
||||
return stat
|
||||
}
|
||||
|
||||
func (cm *connectionManager) close() {
|
||||
cm.healthChecker.stopRebalance()
|
||||
|
||||
// close all clients
|
||||
for _, pools := range cm.innerPools {
|
||||
for _, cli := range pools.clients {
|
||||
_ = cli.close()
|
||||
}
|
||||
}
|
||||
}
|
538
pool/pool.go
538
pool/pool.go
|
@ -7,9 +7,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -21,15 +18,12 @@ import (
|
|||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/google/uuid"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -1940,23 +1934,6 @@ type Pool struct {
|
|||
maxObjectSize uint64
|
||||
}
|
||||
|
||||
type connectionManager struct {
|
||||
innerPools []*innerPool
|
||||
key *ecdsa.PrivateKey
|
||||
cache *sessionCache
|
||||
stokenDuration uint64
|
||||
rebalanceParams rebalanceParameters
|
||||
clientBuilder clientBuilder
|
||||
logger *zap.Logger
|
||||
healthChecker *healthCheck
|
||||
}
|
||||
|
||||
type innerPool struct {
|
||||
lock sync.RWMutex
|
||||
sampler *sampler
|
||||
clients []client
|
||||
}
|
||||
|
||||
const (
|
||||
defaultSessionTokenExpirationDuration = 100 // in epochs
|
||||
defaultErrorThreshold = 100
|
||||
|
@ -1970,41 +1947,6 @@ const (
|
|||
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
||||
)
|
||||
|
||||
// newConnectionManager creates connection pool using parameters.
|
||||
func newConnectionManager(options InitParameters) (*connectionManager, error) {
|
||||
if options.key == nil {
|
||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||
}
|
||||
|
||||
nodesParams, err := adjustNodeParams(options.nodeParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cache, err := newCache(options.sessionExpirationDuration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
||||
}
|
||||
|
||||
fillDefaultInitParams(&options, cache)
|
||||
|
||||
manager := &connectionManager{
|
||||
key: options.key,
|
||||
cache: cache,
|
||||
logger: options.logger,
|
||||
stokenDuration: options.sessionExpirationDuration,
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: nodesParams,
|
||||
nodeRequestTimeout: options.healthcheckTimeout,
|
||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
sessionExpirationDuration: options.sessionExpirationDuration,
|
||||
},
|
||||
clientBuilder: options.clientBuilder,
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
// NewPool creates cnnectionManager using parameters.
|
||||
func NewPool(options InitParameters) (*Pool, error) {
|
||||
manager, err := newConnectionManager(options)
|
||||
|
@ -2042,358 +1984,6 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) dial(ctx context.Context) error {
|
||||
inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams))
|
||||
var atLeastOneHealthy bool
|
||||
|
||||
for i, params := range cm.rebalanceParams.nodesParams {
|
||||
clients := make([]client, len(params.weights))
|
||||
for j, addr := range params.addresses {
|
||||
clients[j] = cm.clientBuilder(addr)
|
||||
if err := clients[j].dial(ctx); err != nil {
|
||||
cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
var st session.Object
|
||||
err := initSessionForDuration(ctx, &st, clients[j], cm.rebalanceParams.sessionExpirationDuration, *cm.key, false)
|
||||
if err != nil {
|
||||
clients[j].setUnhealthy()
|
||||
cm.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
||||
zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
_ = cm.cache.Put(formCacheKey(addr, cm.key, false), st)
|
||||
atLeastOneHealthy = true
|
||||
}
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
sampl := newSampler(params.weights, source)
|
||||
|
||||
inner[i] = &innerPool{
|
||||
sampler: sampl,
|
||||
clients: clients,
|
||||
}
|
||||
}
|
||||
|
||||
if !atLeastOneHealthy {
|
||||
return fmt.Errorf("at least one node must be healthy")
|
||||
}
|
||||
|
||||
cm.innerPools = inner
|
||||
cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval)
|
||||
cm.healthChecker.startRebalance(ctx, cm.updateNodesHealth, cm.rebalanceParams.nodesParams)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if cm.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cm.logger.Log(level, msg, fields...)
|
||||
}
|
||||
|
||||
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||
if params.sessionExpirationDuration == 0 {
|
||||
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
|
||||
}
|
||||
|
||||
if params.errorThreshold == 0 {
|
||||
params.errorThreshold = defaultErrorThreshold
|
||||
}
|
||||
|
||||
if params.clientRebalanceInterval <= 0 {
|
||||
params.clientRebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
|
||||
if params.gracefulCloseOnSwitchTimeout <= 0 {
|
||||
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
|
||||
}
|
||||
|
||||
if params.healthcheckTimeout <= 0 {
|
||||
params.healthcheckTimeout = defaultHealthcheckTimeout
|
||||
}
|
||||
|
||||
if params.nodeDialTimeout <= 0 {
|
||||
params.nodeDialTimeout = defaultDialTimeout
|
||||
}
|
||||
|
||||
if params.nodeStreamTimeout <= 0 {
|
||||
params.nodeStreamTimeout = defaultStreamTimeout
|
||||
}
|
||||
|
||||
if cache.tokenDuration == 0 {
|
||||
cache.tokenDuration = defaultSessionTokenExpirationDuration
|
||||
}
|
||||
|
||||
if params.isMissingClientBuilder() {
|
||||
params.setClientBuilder(func(addr string) client {
|
||||
var prm wrapperPrm
|
||||
prm.setAddress(addr)
|
||||
prm.setKey(*params.key)
|
||||
prm.setLogger(params.logger)
|
||||
prm.setDialTimeout(params.nodeDialTimeout)
|
||||
prm.setStreamTimeout(params.nodeStreamTimeout)
|
||||
prm.setErrorThreshold(params.errorThreshold)
|
||||
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
|
||||
prm.setPoolRequestCallback(params.requestCallback)
|
||||
prm.setGRPCDialOptions(params.dialOptions)
|
||||
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
||||
cache.updateEpoch(info.Epoch())
|
||||
return nil
|
||||
})
|
||||
return newWrapper(prm)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||
if len(nodeParams) == 0 {
|
||||
return nil, errors.New("no FrostFS peers configured")
|
||||
}
|
||||
|
||||
nodesParamsMap := make(map[int]*nodesParam)
|
||||
for _, param := range nodeParams {
|
||||
nodes, ok := nodesParamsMap[param.priority]
|
||||
if !ok {
|
||||
nodes = &nodesParam{priority: param.priority}
|
||||
}
|
||||
nodes.addresses = append(nodes.addresses, param.address)
|
||||
nodes.weights = append(nodes.weights, param.weight)
|
||||
nodesParamsMap[param.priority] = nodes
|
||||
}
|
||||
|
||||
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
|
||||
for _, nodes := range nodesParamsMap {
|
||||
nodes.weights = adjustWeights(nodes.weights)
|
||||
nodesParams = append(nodesParams, nodes)
|
||||
}
|
||||
|
||||
sort.Slice(nodesParams, func(i, j int) bool {
|
||||
return nodesParams[i].priority < nodesParams[j].priority
|
||||
})
|
||||
|
||||
return nodesParams, nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||
wg := sync.WaitGroup{}
|
||||
for i, inner := range cm.innerPools {
|
||||
wg.Add(1)
|
||||
|
||||
bufferWeights := buffers[i]
|
||||
go func(i int, _ *innerPool) {
|
||||
defer wg.Done()
|
||||
cm.updateInnerNodesHealth(ctx, i, bufferWeights)
|
||||
}(i, inner)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
||||
if i > len(cm.innerPools)-1 {
|
||||
return
|
||||
}
|
||||
pool := cm.innerPools[i]
|
||||
options := cm.rebalanceParams
|
||||
|
||||
healthyChanged := new(atomic.Bool)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for j, cli := range pool.clients {
|
||||
wg.Add(1)
|
||||
go func(j int, cli client) {
|
||||
defer wg.Done()
|
||||
|
||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
||||
defer c()
|
||||
|
||||
changed, err := restartIfUnhealthy(tctx, cli)
|
||||
healthy := err == nil
|
||||
if healthy {
|
||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
||||
} else {
|
||||
bufferWeights[j] = 0
|
||||
cm.cache.DeleteByPrefix(cli.address())
|
||||
}
|
||||
|
||||
if changed {
|
||||
fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)}
|
||||
if err != nil {
|
||||
fields = append(fields, zap.String("reason", err.Error()))
|
||||
}
|
||||
|
||||
cm.log(zap.DebugLevel, "health has changed", fields...)
|
||||
healthyChanged.Store(true)
|
||||
}
|
||||
}(j, cli)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if healthyChanged.Load() {
|
||||
probabilities := adjustWeights(bufferWeights)
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
pool.lock.Lock()
|
||||
pool.sampler = newSampler(probabilities, source)
|
||||
pool.lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
||||
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
|
||||
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.setUnhealthy()
|
||||
} else {
|
||||
c.setHealthy()
|
||||
}
|
||||
}()
|
||||
|
||||
wasHealthy := c.isHealthy()
|
||||
|
||||
if res, err := c.healthcheck(ctx); err == nil {
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
if err = c.restart(ctx); err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
res, err := c.healthcheck(ctx)
|
||||
if err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
func adjustWeights(weights []float64) []float64 {
|
||||
adjusted := make([]float64, len(weights))
|
||||
sum := 0.0
|
||||
for _, weight := range weights {
|
||||
sum += weight
|
||||
}
|
||||
if sum > 0 {
|
||||
for i, weight := range weights {
|
||||
adjusted[i] = weight / sum
|
||||
}
|
||||
}
|
||||
|
||||
return adjusted
|
||||
}
|
||||
|
||||
func (cm *connectionManager) connection() (client, error) {
|
||||
for _, inner := range cm.innerPools {
|
||||
cp, err := inner.connection()
|
||||
if err == nil {
|
||||
return cp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func (p *innerPool) connection() (client, error) {
|
||||
p.lock.RLock() // need lock because of using p.sampler
|
||||
defer p.lock.RUnlock()
|
||||
if len(p.clients) == 1 {
|
||||
cp := p.clients[0]
|
||||
if cp.isHealthy() {
|
||||
return cp, nil
|
||||
}
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
attempts := 3 * len(p.clients)
|
||||
for range attempts {
|
||||
i := p.sampler.Next()
|
||||
if cp := p.clients[i]; cp.isHealthy() {
|
||||
return cp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
|
||||
k := keys.PrivateKey{PrivateKey: *key}
|
||||
|
||||
stype := "server"
|
||||
if clientCut {
|
||||
stype = "client"
|
||||
}
|
||||
|
||||
return address + stype + k.String()
|
||||
}
|
||||
|
||||
func (cm *connectionManager) checkSessionTokenErr(err error, address string) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) {
|
||||
cm.cache.DeleteByPrefix(address)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
|
||||
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
epoch := ni.CurrentEpoch()
|
||||
|
||||
var exp uint64
|
||||
if math.MaxUint64-epoch < dur {
|
||||
exp = math.MaxUint64
|
||||
} else {
|
||||
exp = epoch + dur
|
||||
}
|
||||
var prm prmCreateSession
|
||||
prm.setExp(exp)
|
||||
prm.useKey(ownerKey)
|
||||
|
||||
var (
|
||||
id uuid.UUID
|
||||
key frostfsecdsa.PublicKey
|
||||
)
|
||||
|
||||
if clientCut {
|
||||
id = uuid.New()
|
||||
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
|
||||
} else {
|
||||
res, err := c.sessionCreate(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = id.UnmarshalBinary(res.id); err != nil {
|
||||
return fmt.Errorf("invalid session token ID: %w", err)
|
||||
}
|
||||
if err = key.Decode(res.sessionKey); err != nil {
|
||||
return fmt.Errorf("invalid public session key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
dst.SetID(id)
|
||||
dst.SetAuthKey(&key)
|
||||
dst.SetExp(exp)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type callContext struct {
|
||||
client client
|
||||
|
||||
|
@ -2414,96 +2004,6 @@ type callContext struct {
|
|||
sessionClientCut bool
|
||||
}
|
||||
|
||||
func (cm *connectionManager) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||
cp, err := cm.connection()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx.key = cfg.key
|
||||
if ctx.key == nil {
|
||||
// use pool key if caller didn't specify its own
|
||||
ctx.key = cm.key
|
||||
}
|
||||
|
||||
ctx.endpoint = cp.address()
|
||||
ctx.client = cp
|
||||
|
||||
if ctx.sessionTarget != nil && cfg.stoken != nil {
|
||||
ctx.sessionTarget(*cfg.stoken)
|
||||
}
|
||||
|
||||
// note that we don't override session provided by the caller
|
||||
ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
|
||||
if ctx.sessionDefault {
|
||||
ctx.sessionVerb = prmCtx.verb
|
||||
ctx.sessionCnr = prmCtx.cnr
|
||||
ctx.sessionObjSet = prmCtx.objSet
|
||||
ctx.sessionObjs = prmCtx.objs
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// opens new session or uses cached one.
|
||||
// Must be called only on initialized callContext with set sessionTarget.
|
||||
func (cm *connectionManager) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
||||
|
||||
tok, ok := cm.cache.Get(cacheKey)
|
||||
if !ok {
|
||||
// init new session
|
||||
err := initSessionForDuration(ctx, &tok, cc.client, cm.stokenDuration, *cc.key, cc.sessionClientCut)
|
||||
if err != nil {
|
||||
return fmt.Errorf("session API client: %w", err)
|
||||
}
|
||||
|
||||
// cache the opened session
|
||||
cm.cache.Put(cacheKey, tok)
|
||||
}
|
||||
|
||||
tok.ForVerb(cc.sessionVerb)
|
||||
tok.BindContainer(cc.sessionCnr)
|
||||
|
||||
if cc.sessionObjSet {
|
||||
tok.LimitByObjects(cc.sessionObjs...)
|
||||
}
|
||||
|
||||
// sign the token
|
||||
if err := tok.Sign(*cc.key); err != nil {
|
||||
return fmt.Errorf("sign token of the opened session: %w", err)
|
||||
}
|
||||
|
||||
cc.sessionTarget(tok)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// opens default session (if sessionDefault is set), and calls f. If f returns
|
||||
// session-related error then cached token is removed.
|
||||
func (cm *connectionManager) call(ctx context.Context, cc *callContext, f func() error) error {
|
||||
var err error
|
||||
|
||||
if cc.sessionDefault {
|
||||
err = cm.openDefaultSession(ctx, cc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open default session: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = f()
|
||||
_ = cm.checkSessionTokenErr(err, cc.endpoint)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// fillAppropriateKey use pool key if caller didn't specify its own.
|
||||
func (cm *connectionManager) fillAppropriateKey(prm *prmCommon) {
|
||||
if prm.key == nil {
|
||||
prm.key = cm.key
|
||||
}
|
||||
}
|
||||
|
||||
// ResPutObject is designed to provide identifier and creation epoch of the saved object.
|
||||
type ResPutObject struct {
|
||||
ObjectID oid.ID
|
||||
|
@ -2978,33 +2478,6 @@ func (p Pool) Statistic() Statistic {
|
|||
return p.manager.Statistic()
|
||||
}
|
||||
|
||||
func (cm connectionManager) Statistic() Statistic {
|
||||
stat := Statistic{}
|
||||
for _, inner := range cm.innerPools {
|
||||
nodes := make([]string, 0, len(inner.clients))
|
||||
inner.lock.RLock()
|
||||
for _, cl := range inner.clients {
|
||||
if cl.isHealthy() {
|
||||
nodes = append(nodes, cl.address())
|
||||
}
|
||||
node := NodeStatistic{
|
||||
address: cl.address(),
|
||||
methods: cl.methodsStatus(),
|
||||
overallErrors: cl.overallErrorRate(),
|
||||
currentErrors: cl.currentErrorRate(),
|
||||
}
|
||||
stat.nodes = append(stat.nodes, node)
|
||||
stat.overallErrors += node.overallErrors
|
||||
}
|
||||
inner.lock.RUnlock()
|
||||
if len(stat.currentNodes) == 0 {
|
||||
stat.currentNodes = nodes
|
||||
}
|
||||
}
|
||||
|
||||
return stat
|
||||
}
|
||||
|
||||
// waitForContainerPresence waits until the container is found on the FrostFS network.
|
||||
func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
|
||||
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
|
||||
|
@ -3083,17 +2556,6 @@ func (p *Pool) Close() {
|
|||
p.manager.close()
|
||||
}
|
||||
|
||||
func (cm *connectionManager) close() {
|
||||
cm.healthChecker.stopRebalance()
|
||||
|
||||
// close all clients
|
||||
for _, pools := range cm.innerPools {
|
||||
for _, cli := range pools.clients {
|
||||
_ = cli.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SyncContainerWithNetwork applies network configuration received via
|
||||
// the Pool to the container. Changes the container if it does not satisfy
|
||||
// network configuration.
|
||||
|
|
Loading…
Add table
Reference in a new issue