From d77a8742bc4e6b7664f21f944a4da1a738ad8b35 Mon Sep 17 00:00:00 2001 From: Alexander Chuprov Date: Tue, 4 Mar 2025 18:08:35 +0300 Subject: [PATCH] [#300] pool: Move 'healthCheck' to separate file Signed-off-by: Alexander Chuprov --- pool/healthcheck.go | 47 +++++++++++++++++++++++++++++++++++++++++++++ pool/pool.go | 41 --------------------------------------- 2 files changed, 47 insertions(+), 41 deletions(-) create mode 100644 pool/healthcheck.go diff --git a/pool/healthcheck.go b/pool/healthcheck.go new file mode 100644 index 00000000..2f5dec9e --- /dev/null +++ b/pool/healthcheck.go @@ -0,0 +1,47 @@ +package pool + +import ( + "context" + "time" +) + +type healthCheck struct { + cancel context.CancelFunc + closedCh chan struct{} + + clientRebalanceInterval time.Duration +} + +func newHealthCheck(clientRebalanceInterval time.Duration) *healthCheck { + var h healthCheck + h.clientRebalanceInterval = clientRebalanceInterval + h.closedCh = make(chan struct{}) + return &h +} + +// startRebalance runs loop to monitor connection healthy status. +func (h *healthCheck) startRebalance(ctx context.Context, callback func(ctx context.Context)) { + ctx, cancel := context.WithCancel(ctx) + h.cancel = cancel + + go func() { + ticker := time.NewTicker(h.clientRebalanceInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + close(h.closedCh) + return + case <-ticker.C: + callback(ctx) + ticker.Reset(h.clientRebalanceInterval) + } + } + }() +} + +func (h *healthCheck) stopRebalance() { + h.cancel() + <-h.closedCh +} diff --git a/pool/pool.go b/pool/pool.go index 4cf156da..8810c159 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -2026,20 +2026,6 @@ type connectionManager struct { healthChecker *healthCheck } -type healthCheck struct { - cancel context.CancelFunc - closedCh chan struct{} - - clientRebalanceInterval time.Duration -} - -func newHealthCheck(clientRebalanceInterval time.Duration) *healthCheck { - var h healthCheck - h.clientRebalanceInterval = clientRebalanceInterval - h.closedCh = make(chan struct{}) - return &h -} - type innerPool struct { lock sync.RWMutex sampler *sampler @@ -2289,33 +2275,6 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { return nodesParams, nil } -// startRebalance runs loop to monitor connection healthy status. -func (h *healthCheck) startRebalance(ctx context.Context, callback func(ctx context.Context)) { - ctx, cancel := context.WithCancel(ctx) - h.cancel = cancel - - go func() { - ticker := time.NewTicker(h.clientRebalanceInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - close(h.closedCh) - return - case <-ticker.C: - callback(ctx) - ticker.Reset(h.clientRebalanceInterval) - } - } - }() -} - -func (h *healthCheck) stopRebalance() { - h.cancel() - <-h.closedCh -} - func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range cm.innerPools {