From 06ef257ddc9ffc8f489de3ea73e9b53dfcbfa174 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 4 Mar 2025 14:08:08 +0300 Subject: [PATCH] [#339] pool/tree: Do not lock mutex on circuit break function Circuit break function may take some time to execute so it should not be executed when lock is enabled. Signed-off-by: Alex Vanin --- pool/tree/circuitbreaker.go | 61 ++++++++++++++++++++++---------- pool/tree/circuitbreaker_test.go | 27 ++++++++++++++ 2 files changed, 69 insertions(+), 19 deletions(-) diff --git a/pool/tree/circuitbreaker.go b/pool/tree/circuitbreaker.go index db63fb8f..5eadcb4b 100644 --- a/pool/tree/circuitbreaker.go +++ b/pool/tree/circuitbreaker.go @@ -11,7 +11,7 @@ type ( breakDuration time.Duration threshold int - mu sync.Mutex + mu sync.RWMutex state map[uint64]state } @@ -31,33 +31,56 @@ func newCircuitBreaker(breakDuration time.Duration, threshold int) *circuitBreak } } -func (c *circuitBreaker) Do(id uint64, f func() error) error { - c.mu.Lock() - defer c.mu.Unlock() +func (cb *circuitBreaker) breakTime(id uint64) (time.Time, bool) { + cb.mu.RLock() + defer cb.mu.RUnlock() - if _, ok := c.state[id]; !ok { - c.state[id] = state{} + if s, ok := cb.state[id]; ok { + return s.breakTimestamp, true } - s := c.state[id] - defer func() { - c.state[id] = s - }() + return time.Time{}, false +} - if time.Since(s.breakTimestamp) < c.breakDuration { +func (cb *circuitBreaker) openBreak(id uint64) { + cb.mu.Lock() + defer cb.mu.Unlock() + delete(cb.state, id) +} + +func (cb *circuitBreaker) incError(id uint64, doTime time.Time) { + cb.mu.Lock() + defer cb.mu.Unlock() + + s := cb.state[id] + + s.counter++ + if s.counter >= cb.threshold { + s.counter = cb.threshold + if s.breakTimestamp.Before(doTime) { + s.breakTimestamp = doTime + } + } + + cb.state[id] = s +} + +func (c *circuitBreaker) Do(id uint64, f func() error) error { + breakTime, ok := c.breakTime(id) + if ok && time.Since(breakTime) < c.breakDuration { return ErrCBClosed } + // Use this timestamp to update circuit breaker in case of an error. + // f() may be blocked for unpredictable duration, so concurrent calls + // may update time in 'incError' endlessly and circuit will never be open + doTime := time.Now() + err := f() if err == nil { - s.counter = 0 - return nil - } - - s.counter++ - if s.counter >= c.threshold { - s.counter = c.threshold - s.breakTimestamp = time.Now() + c.openBreak(id) + } else { + c.incError(id, doTime) } return err diff --git a/pool/tree/circuitbreaker_test.go b/pool/tree/circuitbreaker_test.go index d15e920e..aefa9d69 100644 --- a/pool/tree/circuitbreaker_test.go +++ b/pool/tree/circuitbreaker_test.go @@ -2,6 +2,7 @@ package tree import ( "errors" + "runtime" "testing" "time" @@ -39,3 +40,29 @@ func TestCircuitBreaker(t *testing.T) { // Immediate request should return circuit breaker error require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed) } + +func TestCircuitBreakerNoBlock(t *testing.T) { + remoteErr := errors.New("service is being synchronized") + funcDuration := 2 * time.Second + threshold := 100 + cb := newCircuitBreaker(1*time.Minute, threshold) + + slowFunc := func() error { + time.Sleep(funcDuration) + return remoteErr + } + + for i := 0; i < threshold; i++ { + // run in multiple goroutines Do function and make sure it is not + go func() { + cb.Do(1, slowFunc) + }() + } + + // wait for one slow func duration + some delta + time.Sleep(funcDuration + 100*time.Millisecond) + runtime.Gosched() + // expect that all goroutines were not blocked by mutex in circuit breaker + // therefore all functions are done and circuit is closed + require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed) +}