[#300] pool: Extract connection handler functionality to 'connectionManager'
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
parent
69b0711d12
commit
b480df99ca
3 changed files with 201 additions and 160 deletions
281
pool/pool.go
281
pool/pool.go
|
@ -2006,17 +2006,23 @@ type resCreateSession struct {
|
||||||
//
|
//
|
||||||
// See pool package overview to get some examples.
|
// See pool package overview to get some examples.
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
innerPools []*innerPool
|
manager *connectionManager
|
||||||
key *ecdsa.PrivateKey
|
logger *zap.Logger
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
cache *sessionCache
|
||||||
|
stokenDuration uint64
|
||||||
|
|
||||||
|
maxObjectSize uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type connectionManager struct {
|
||||||
|
innerPools []*innerPool
|
||||||
|
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
closedCh chan struct{}
|
closedCh chan struct{}
|
||||||
cache *sessionCache
|
|
||||||
stokenDuration uint64
|
|
||||||
rebalanceParams rebalanceParameters
|
rebalanceParams rebalanceParameters
|
||||||
clientBuilder clientBuilder
|
clientBuilder clientBuilder
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
|
||||||
maxObjectSize uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type innerPool struct {
|
type innerPool struct {
|
||||||
|
@ -2038,8 +2044,10 @@ const (
|
||||||
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewPool creates connection pool using parameters.
|
// newConnectionManager returns an instance of connectionManager configured according to the parameters.
|
||||||
func NewPool(options InitParameters) (*Pool, error) {
|
//
|
||||||
|
// Before using connectionManager, you MUST call Dial.
|
||||||
|
func newConnectionManager(options InitParameters) (*connectionManager, error) {
|
||||||
if options.key == nil {
|
if options.key == nil {
|
||||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||||
}
|
}
|
||||||
|
@ -2049,18 +2057,8 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cache, err := newCache(options.sessionExpirationDuration)
|
manager := &connectionManager{
|
||||||
if err != nil {
|
logger: options.logger,
|
||||||
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fillDefaultInitParams(&options, cache)
|
|
||||||
|
|
||||||
pool := &Pool{
|
|
||||||
key: options.key,
|
|
||||||
cache: cache,
|
|
||||||
logger: options.logger,
|
|
||||||
stokenDuration: options.sessionExpirationDuration,
|
|
||||||
rebalanceParams: rebalanceParameters{
|
rebalanceParams: rebalanceParameters{
|
||||||
nodesParams: nodesParams,
|
nodesParams: nodesParams,
|
||||||
nodeRequestTimeout: options.healthcheckTimeout,
|
nodeRequestTimeout: options.healthcheckTimeout,
|
||||||
|
@ -2070,6 +2068,33 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
clientBuilder: options.clientBuilder,
|
clientBuilder: options.clientBuilder,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return manager, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPool returns an instance of Pool configured according to the parameters.
|
||||||
|
//
|
||||||
|
// Before using Pool, you MUST call Dial.
|
||||||
|
func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
cache, err := newCache(options.sessionExpirationDuration)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fillDefaultInitParams(&options, cache)
|
||||||
|
|
||||||
|
manager, err := newConnectionManager(options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := &Pool{
|
||||||
|
cache: cache,
|
||||||
|
key: options.key,
|
||||||
|
logger: options.logger,
|
||||||
|
manager: manager,
|
||||||
|
stokenDuration: options.sessionExpirationDuration,
|
||||||
|
}
|
||||||
|
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2082,28 +2107,51 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
//
|
//
|
||||||
// See also InitParameters.SetClientRebalanceInterval.
|
// See also InitParameters.SetClientRebalanceInterval.
|
||||||
func (p *Pool) Dial(ctx context.Context) error {
|
func (p *Pool) Dial(ctx context.Context) error {
|
||||||
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
|
err := p.manager.dial(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var atLeastOneHealthy bool
|
||||||
|
p.manager.iterate(func(cl client) {
|
||||||
|
var st session.Object
|
||||||
|
err := initSessionForDuration(ctx, &st, cl, p.manager.rebalanceParams.sessionExpirationDuration, *p.key, false)
|
||||||
|
if err != nil {
|
||||||
|
if p.logger != nil {
|
||||||
|
p.logger.Log(zap.WarnLevel, "failed to create frostfs session token for client",
|
||||||
|
zap.String("address", cl.address()), zap.Error(err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = p.cache.Put(formCacheKey(cl.address(), p.key, false), st)
|
||||||
|
atLeastOneHealthy = true
|
||||||
|
})
|
||||||
|
|
||||||
|
if !atLeastOneHealthy {
|
||||||
|
return fmt.Errorf("at least one node must be healthy")
|
||||||
|
}
|
||||||
|
|
||||||
|
ni, err := p.NetworkInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get network info for max object size: %w", err)
|
||||||
|
}
|
||||||
|
p.maxObjectSize = ni.MaxObjectSize()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *connectionManager) dial(ctx context.Context) error {
|
||||||
|
inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams))
|
||||||
var atLeastOneHealthy bool
|
var atLeastOneHealthy bool
|
||||||
|
|
||||||
for i, params := range p.rebalanceParams.nodesParams {
|
for i, params := range cm.rebalanceParams.nodesParams {
|
||||||
clients := make([]client, len(params.weights))
|
clients := make([]client, len(params.weights))
|
||||||
for j, addr := range params.addresses {
|
for j, addr := range params.addresses {
|
||||||
clients[j] = p.clientBuilder(addr)
|
clients[j] = cm.clientBuilder(addr)
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
if err := clients[j].dial(ctx); err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var st session.Object
|
|
||||||
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
|
|
||||||
if err != nil {
|
|
||||||
clients[j].setUnhealthy()
|
|
||||||
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
|
||||||
zap.String("address", addr), zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
|
|
||||||
atLeastOneHealthy = true
|
atLeastOneHealthy = true
|
||||||
}
|
}
|
||||||
source := rand.NewSource(time.Now().UnixNano())
|
source := rand.NewSource(time.Now().UnixNano())
|
||||||
|
@ -2120,26 +2168,20 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
p.cancel = cancel
|
cm.cancel = cancel
|
||||||
p.closedCh = make(chan struct{})
|
cm.closedCh = make(chan struct{})
|
||||||
p.innerPools = inner
|
cm.innerPools = inner
|
||||||
|
|
||||||
ni, err := p.NetworkInfo(ctx)
|
go cm.startRebalance(ctx)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("get network info for max object size: %w", err)
|
|
||||||
}
|
|
||||||
p.maxObjectSize = ni.MaxObjectSize()
|
|
||||||
|
|
||||||
go p.startRebalance(ctx)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
if p.logger == nil {
|
if cm.logger == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.logger.Log(level, msg, fields...)
|
cm.logger.Log(level, msg, fields...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||||
|
@ -2226,47 +2268,47 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// startRebalance runs loop to monitor connection healthy status.
|
// startRebalance runs loop to monitor connection healthy status.
|
||||||
func (p *Pool) startRebalance(ctx context.Context) {
|
func (cm *connectionManager) startRebalance(ctx context.Context) {
|
||||||
ticker := time.NewTicker(p.rebalanceParams.clientRebalanceInterval)
|
ticker := time.NewTicker(cm.rebalanceParams.clientRebalanceInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
buffers := make([][]float64, len(cm.rebalanceParams.nodesParams))
|
||||||
for i, params := range p.rebalanceParams.nodesParams {
|
for i, params := range cm.rebalanceParams.nodesParams {
|
||||||
buffers[i] = make([]float64, len(params.weights))
|
buffers[i] = make([]float64, len(params.weights))
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
close(p.closedCh)
|
close(cm.closedCh)
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
p.updateNodesHealth(ctx, buffers)
|
cm.updateNodesHealth(ctx, buffers)
|
||||||
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
|
ticker.Reset(cm.rebalanceParams.clientRebalanceInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i, inner := range p.innerPools {
|
for i, inner := range cm.innerPools {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
bufferWeights := buffers[i]
|
bufferWeights := buffers[i]
|
||||||
go func(i int, _ *innerPool) {
|
go func(i int, _ *innerPool) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
p.updateInnerNodesHealth(ctx, i, bufferWeights)
|
cm.updateInnerNodesHealth(ctx, i, bufferWeights)
|
||||||
}(i, inner)
|
}(i, inner)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
||||||
if i > len(p.innerPools)-1 {
|
if i > len(cm.innerPools)-1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pool := p.innerPools[i]
|
pool := cm.innerPools[i]
|
||||||
options := p.rebalanceParams
|
options := cm.rebalanceParams
|
||||||
|
|
||||||
healthyChanged := new(atomic.Bool)
|
healthyChanged := new(atomic.Bool)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
@ -2285,7 +2327,6 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
||||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
bufferWeights[j] = options.nodesParams[i].weights[j]
|
||||||
} else {
|
} else {
|
||||||
bufferWeights[j] = 0
|
bufferWeights[j] = 0
|
||||||
p.cache.DeleteByPrefix(cli.address())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if changed {
|
if changed {
|
||||||
|
@ -2294,7 +2335,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
||||||
fields = append(fields, zap.String("reason", err.Error()))
|
fields = append(fields, zap.String("reason", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
p.log(zap.DebugLevel, "health has changed", fields...)
|
cm.log(zap.DebugLevel, "health has changed", fields...)
|
||||||
healthyChanged.Store(true)
|
healthyChanged.Store(true)
|
||||||
}
|
}
|
||||||
}(j, cli)
|
}(j, cli)
|
||||||
|
@ -2362,8 +2403,8 @@ func adjustWeights(weights []float64) []float64 {
|
||||||
return adjusted
|
return adjusted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) connection() (client, error) {
|
func (cm *connectionManager) connection() (client, error) {
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range cm.innerPools {
|
||||||
cp, err := inner.connection()
|
cp, err := inner.connection()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return cp, nil
|
return cp, nil
|
||||||
|
@ -2373,6 +2414,17 @@ func (p *Pool) connection() (client, error) {
|
||||||
return nil, errors.New("no healthy client")
|
return nil, errors.New("no healthy client")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// iterate iterates over all clients in all innerPools.
|
||||||
|
func (cm *connectionManager) iterate(cb func(client)) {
|
||||||
|
for _, inner := range cm.innerPools {
|
||||||
|
for _, cl := range inner.clients {
|
||||||
|
if cl.isHealthy() {
|
||||||
|
cb(cl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *innerPool) connection() (client, error) {
|
func (p *innerPool) connection() (client, error) {
|
||||||
p.lock.RLock() // need lock because of using p.sampler
|
p.lock.RLock() // need lock because of using p.sampler
|
||||||
defer p.lock.RUnlock()
|
defer p.lock.RUnlock()
|
||||||
|
@ -2484,32 +2536,33 @@ type callContext struct {
|
||||||
sessionClientCut bool
|
sessionClientCut bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
func (p *Pool) initCall(ctxCall *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||||
cp, err := p.connection()
|
p.fillAppropriateKey(&cfg)
|
||||||
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.key = cfg.key
|
ctxCall.key = cfg.key
|
||||||
if ctx.key == nil {
|
if ctxCall.key == nil {
|
||||||
// use pool key if caller didn't specify its own
|
// use pool key if caller didn't specify its own
|
||||||
ctx.key = p.key
|
ctxCall.key = p.key
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.endpoint = cp.address()
|
ctxCall.endpoint = cp.address()
|
||||||
ctx.client = cp
|
ctxCall.client = cp
|
||||||
|
|
||||||
if ctx.sessionTarget != nil && cfg.stoken != nil {
|
if ctxCall.sessionTarget != nil && cfg.stoken != nil {
|
||||||
ctx.sessionTarget(*cfg.stoken)
|
ctxCall.sessionTarget(*cfg.stoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
// note that we don't override session provided by the caller
|
// note that we don't override session provided by the caller
|
||||||
ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
|
ctxCall.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
|
||||||
if ctx.sessionDefault {
|
if ctxCall.sessionDefault {
|
||||||
ctx.sessionVerb = prmCtx.verb
|
ctxCall.sessionVerb = prmCtx.verb
|
||||||
ctx.sessionCnr = prmCtx.cnr
|
ctxCall.sessionCnr = prmCtx.cnr
|
||||||
ctx.sessionObjSet = prmCtx.objSet
|
ctxCall.sessionObjSet = prmCtx.objSet
|
||||||
ctx.sessionObjs = prmCtx.objs
|
ctxCall.sessionObjs = prmCtx.objs
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
@ -2594,10 +2647,8 @@ func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObj
|
||||||
prmCtx.useVerb(session.VerbObjectPatch)
|
prmCtx.useVerb(session.VerbObjectPatch)
|
||||||
prmCtx.useContainer(prm.addr.Container())
|
prmCtx.useContainer(prm.addr.Container())
|
||||||
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var ctxCall callContext
|
var ctxCall callContext
|
||||||
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
if err := p.initCall(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||||
return ResPatchObject{}, fmt.Errorf("init call context: %w", err)
|
return ResPatchObject{}, fmt.Errorf("init call context: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2629,11 +2680,9 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, e
|
||||||
prmCtx.useVerb(session.VerbObjectPut)
|
prmCtx.useVerb(session.VerbObjectPut)
|
||||||
prmCtx.useContainer(cnr)
|
prmCtx.useContainer(cnr)
|
||||||
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var ctxCall callContext
|
var ctxCall callContext
|
||||||
ctxCall.sessionClientCut = prm.clientCut
|
ctxCall.sessionClientCut = prm.clientCut
|
||||||
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
if err := p.initCall(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||||
return ResPutObject{}, fmt.Errorf("init call context: %w", err)
|
return ResPutObject{}, fmt.Errorf("init call context: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2686,12 +2735,10 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
cc.sessionTarget = prm.UseSession
|
cc.sessionTarget = prm.UseSession
|
||||||
|
|
||||||
err := p.initCallContext(&cc, prm.prmCommon, prmCtx)
|
err := p.initCall(&cc, prm.prmCommon, prmCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -2735,14 +2782,12 @@ type ResGetObject struct {
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
cc.sessionTarget = prm.UseSession
|
cc.sessionTarget = prm.UseSession
|
||||||
|
|
||||||
var res ResGetObject
|
var res ResGetObject
|
||||||
|
|
||||||
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
err := p.initCall(&cc, prm.prmCommon, prmContext{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
@ -2760,14 +2805,12 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
cc.sessionTarget = prm.UseSession
|
cc.sessionTarget = prm.UseSession
|
||||||
|
|
||||||
var obj object.Object
|
var obj object.Object
|
||||||
|
|
||||||
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
err := p.initCall(&cc, prm.prmCommon, prmContext{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return obj, err
|
return obj, err
|
||||||
}
|
}
|
||||||
|
@ -2811,14 +2854,12 @@ func (x *ResObjectRange) Close() error {
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
cc.sessionTarget = prm.UseSession
|
cc.sessionTarget = prm.UseSession
|
||||||
|
|
||||||
var res ResObjectRange
|
var res ResObjectRange
|
||||||
|
|
||||||
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
err := p.initCall(&cc, prm.prmCommon, prmContext{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
@ -2882,14 +2923,12 @@ func (x *ResObjectSearch) Close() {
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
cc.sessionTarget = prm.UseSession
|
cc.sessionTarget = prm.UseSession
|
||||||
|
|
||||||
var res ResObjectSearch
|
var res ResObjectSearch
|
||||||
|
|
||||||
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
err := p.initCall(&cc, prm.prmCommon, prmContext{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
@ -2914,7 +2953,7 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.ID{}, err
|
return cid.ID{}, err
|
||||||
}
|
}
|
||||||
|
@ -2931,7 +2970,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container.Container{}, err
|
return container.Container{}, err
|
||||||
}
|
}
|
||||||
|
@ -2946,7 +2985,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container
|
||||||
|
|
||||||
// ListContainers requests identifiers of the account-owned containers.
|
// ListContainers requests identifiers of the account-owned containers.
|
||||||
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2962,7 +3001,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
|
||||||
// ListContainersStream requests identifiers of the account-owned containers.
|
// ListContainersStream requests identifiers of the account-owned containers.
|
||||||
func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) {
|
func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) {
|
||||||
var res ResListStream
|
var res ResListStream
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
@ -2984,7 +3023,7 @@ func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (Res
|
||||||
//
|
//
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -2999,7 +3038,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
|
||||||
|
|
||||||
// AddAPEChain sends a request to set APE chain rules for a target (basically, for a container).
|
// AddAPEChain sends a request to set APE chain rules for a target (basically, for a container).
|
||||||
func (p *Pool) AddAPEChain(ctx context.Context, prm PrmAddAPEChain) error {
|
func (p *Pool) AddAPEChain(ctx context.Context, prm PrmAddAPEChain) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -3014,7 +3053,7 @@ func (p *Pool) AddAPEChain(ctx context.Context, prm PrmAddAPEChain) error {
|
||||||
|
|
||||||
// RemoveAPEChain sends a request to remove APE chain rules for a target.
|
// RemoveAPEChain sends a request to remove APE chain rules for a target.
|
||||||
func (p *Pool) RemoveAPEChain(ctx context.Context, prm PrmRemoveAPEChain) error {
|
func (p *Pool) RemoveAPEChain(ctx context.Context, prm PrmRemoveAPEChain) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -3029,7 +3068,7 @@ func (p *Pool) RemoveAPEChain(ctx context.Context, prm PrmRemoveAPEChain) error
|
||||||
|
|
||||||
// ListAPEChains sends a request to list APE chains rules for a target.
|
// ListAPEChains sends a request to list APE chains rules for a target.
|
||||||
func (p *Pool) ListAPEChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) {
|
func (p *Pool) ListAPEChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -3046,7 +3085,7 @@ func (p *Pool) ListAPEChains(ctx context.Context, prm PrmListAPEChains) ([]ape.C
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return accounting.Decimal{}, err
|
return accounting.Decimal{}, err
|
||||||
}
|
}
|
||||||
|
@ -3061,8 +3100,12 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim
|
||||||
|
|
||||||
// Statistic returns connection statistics.
|
// Statistic returns connection statistics.
|
||||||
func (p Pool) Statistic() Statistic {
|
func (p Pool) Statistic() Statistic {
|
||||||
|
return p.manager.Statistic()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm connectionManager) Statistic() Statistic {
|
||||||
stat := Statistic{}
|
stat := Statistic{}
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range cm.innerPools {
|
||||||
nodes := make([]string, 0, len(inner.clients))
|
nodes := make([]string, 0, len(inner.clients))
|
||||||
inner.lock.RLock()
|
inner.lock.RLock()
|
||||||
for _, cl := range inner.clients {
|
for _, cl := range inner.clients {
|
||||||
|
@ -3130,7 +3173,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmap.NetworkInfo{}, err
|
return netmap.NetworkInfo{}, err
|
||||||
}
|
}
|
||||||
|
@ -3147,7 +3190,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmap.NetMap{}, err
|
return netmap.NetMap{}, err
|
||||||
}
|
}
|
||||||
|
@ -3162,11 +3205,15 @@ func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
|
|
||||||
// Close closes the Pool and releases all the associated resources.
|
// Close closes the Pool and releases all the associated resources.
|
||||||
func (p *Pool) Close() {
|
func (p *Pool) Close() {
|
||||||
p.cancel()
|
p.manager.close()
|
||||||
<-p.closedCh
|
}
|
||||||
|
|
||||||
|
func (cm *connectionManager) close() {
|
||||||
|
cm.cancel()
|
||||||
|
<-cm.closedCh
|
||||||
|
|
||||||
// close all clients
|
// close all clients
|
||||||
for _, pools := range p.innerPools {
|
for _, pools := range cm.innerPools {
|
||||||
for _, cli := range pools.clients {
|
for _, cli := range pools.clients {
|
||||||
_ = cli.close()
|
_ = cli.close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||||
condition := func() bool {
|
condition := func() bool {
|
||||||
cp, err := clientPool.connection()
|
cp, err := clientPool.manager.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||||
|
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
|
@ -220,13 +220,12 @@ func TestOneOfTwoFailed(t *testing.T) {
|
||||||
err = pool.Dial(context.Background())
|
err = pool.Dial(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, err)
|
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
for range 5 {
|
for range 5 {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
|
@ -369,7 +368,7 @@ func TestUpdateNodesHealth(t *testing.T) {
|
||||||
tc.prepareCli(cli)
|
tc.prepareCli(cli)
|
||||||
p, log := newPool(t, cli)
|
p, log := newPool(t, cli)
|
||||||
|
|
||||||
p.updateNodesHealth(ctx, [][]float64{{1}})
|
p.manager.updateNodesHealth(ctx, [][]float64{{1}})
|
||||||
|
|
||||||
changed := tc.wasHealthy != tc.willHealthy
|
changed := tc.wasHealthy != tc.willHealthy
|
||||||
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
|
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
|
||||||
|
@ -385,19 +384,20 @@ func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
return &Pool{
|
return &Pool{
|
||||||
innerPools: []*innerPool{{
|
cache: cache,
|
||||||
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
key: newPrivateKey(t),
|
||||||
clients: []client{cli},
|
manager: &connectionManager{
|
||||||
}},
|
innerPools: []*innerPool{{
|
||||||
cache: cache,
|
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
||||||
key: newPrivateKey(t),
|
clients: []client{cli},
|
||||||
closedCh: make(chan struct{}),
|
}},
|
||||||
rebalanceParams: rebalanceParameters{
|
closedCh: make(chan struct{}),
|
||||||
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
|
rebalanceParams: rebalanceParameters{
|
||||||
nodeRequestTimeout: time.Second,
|
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
|
||||||
clientRebalanceInterval: 200 * time.Millisecond,
|
nodeRequestTimeout: time.Second,
|
||||||
},
|
clientRebalanceInterval: 200 * time.Millisecond,
|
||||||
logger: log,
|
},
|
||||||
|
logger: log},
|
||||||
}, observedLog
|
}, observedLog
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,7 +435,7 @@ func TestTwoFailed(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
_, err = pool.connection()
|
_, err = pool.manager.connection()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "no healthy")
|
require.Contains(t, err.Error(), "no healthy")
|
||||||
}
|
}
|
||||||
|
@ -469,7 +469,7 @@ func TestSessionCache(t *testing.T) {
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
@ -482,7 +482,7 @@ func TestSessionCache(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
// cache must not contain session token
|
// cache must not contain session token
|
||||||
cp, err = pool.connection()
|
cp, err = pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
|
@ -494,7 +494,7 @@ func TestSessionCache(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err = pool.connection()
|
cp, err = pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
@ -538,7 +538,7 @@ func TestPriority(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
||||||
firstNode := func() bool {
|
firstNode := func() bool {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey1)
|
return st.AssertAuthKey(&expectedAuthKey1)
|
||||||
|
@ -546,7 +546,7 @@ func TestPriority(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||||
secondNode := func() bool {
|
secondNode := func() bool {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey2)
|
return st.AssertAuthKey(&expectedAuthKey2)
|
||||||
|
@ -583,7 +583,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err := pool.connection()
|
cp, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
@ -636,9 +636,8 @@ func TestSessionTokenOwner(t *testing.T) {
|
||||||
cc.sessionTarget = func(tok session.Object) {
|
cc.sessionTarget = func(tok session.Object) {
|
||||||
tkn = tok
|
tkn = tok
|
||||||
}
|
}
|
||||||
err = p.initCallContext(&cc, prm, prmCtx)
|
err = p.initCall(&cc, prm, prmCtx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = p.openDefaultSession(ctx, &cc)
|
err = p.openDefaultSession(ctx, &cc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, tkn.VerifySignature())
|
require.True(t, tkn.VerifySignature())
|
||||||
|
@ -922,14 +921,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
for range errorThreshold {
|
for range errorThreshold {
|
||||||
conn, err := pool.connection()
|
conn, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, nodes[0].address, conn.address())
|
require.Equal(t, nodes[0].address, conn.address())
|
||||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := pool.connection()
|
conn, err := pool.manager.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, nodes[1].address, conn.address())
|
require.Equal(t, nodes[1].address, conn.address())
|
||||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||||
|
|
|
@ -47,9 +47,6 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
buffer = make([]float64, len(weights))
|
buffer = make([]float64, len(weights))
|
||||||
)
|
)
|
||||||
|
|
||||||
cache, err := newCache(0)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
client1 := newMockClient(names[0], *newPrivateKey(t))
|
client1 := newMockClient(names[0], *newPrivateKey(t))
|
||||||
client1.errOnDial()
|
client1.errOnDial()
|
||||||
|
|
||||||
|
@ -59,22 +56,20 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
sampler: newSampler(weights, rand.NewSource(0)),
|
sampler: newSampler(weights, rand.NewSource(0)),
|
||||||
clients: []client{client1, client2},
|
clients: []client{client1, client2},
|
||||||
}
|
}
|
||||||
p := &Pool{
|
cm := &connectionManager{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
cache: cache,
|
|
||||||
key: newPrivateKey(t),
|
|
||||||
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// check getting first node connection before rebalance happened
|
// check getting first node connection before rebalance happened
|
||||||
connection0, err := p.connection()
|
connection0, err := cm.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mock0 := connection0.(*mockClient)
|
mock0 := connection0.(*mockClient)
|
||||||
require.Equal(t, names[0], mock0.address())
|
require.Equal(t, names[0], mock0.address())
|
||||||
|
|
||||||
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||||
|
|
||||||
connection1, err := p.connection()
|
connection1, err := cm.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mock1 := connection1.(*mockClient)
|
mock1 := connection1.(*mockClient)
|
||||||
require.Equal(t, names[1], mock1.address())
|
require.Equal(t, names[1], mock1.address())
|
||||||
|
@ -84,10 +79,10 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
inner.clients[0] = newMockClient(names[0], *newPrivateKey(t))
|
inner.clients[0] = newMockClient(names[0], *newPrivateKey(t))
|
||||||
inner.lock.Unlock()
|
inner.lock.Unlock()
|
||||||
|
|
||||||
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||||
inner.sampler = newSampler(weights, rand.NewSource(0))
|
inner.sampler = newSampler(weights, rand.NewSource(0))
|
||||||
|
|
||||||
connection0, err = p.connection()
|
connection0, err = cm.connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mock0 = connection0.(*mockClient)
|
mock0 = connection0.(*mockClient)
|
||||||
require.Equal(t, names[0], mock0.address())
|
require.Equal(t, names[0], mock0.address())
|
||||||
|
@ -108,12 +103,12 @@ func TestHealthyNoReweight(t *testing.T) {
|
||||||
newMockClient(names[1], *newPrivateKey(t)),
|
newMockClient(names[1], *newPrivateKey(t)),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
p := &Pool{
|
cm := &connectionManager{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
||||||
}
|
}
|
||||||
|
|
||||||
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||||
|
|
||||||
inner.lock.RLock()
|
inner.lock.RLock()
|
||||||
defer inner.lock.RUnlock()
|
defer inner.lock.RUnlock()
|
||||||
|
|
Loading…
Add table
Reference in a new issue