[#339] pool/tree: Do not lock mutex on circuit break function

Circuit break function may take some time to execute so it should
not be executed when lock is enabled.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2025-03-04 14:08:08 +03:00
parent 2d08fa5240
commit 06ef257ddc
2 changed files with 69 additions and 19 deletions

View file

@ -11,7 +11,7 @@ type (
breakDuration time.Duration breakDuration time.Duration
threshold int threshold int
mu sync.Mutex mu sync.RWMutex
state map[uint64]state state map[uint64]state
} }
@ -31,33 +31,56 @@ func newCircuitBreaker(breakDuration time.Duration, threshold int) *circuitBreak
} }
} }
func (c *circuitBreaker) Do(id uint64, f func() error) error { func (cb *circuitBreaker) breakTime(id uint64) (time.Time, bool) {
c.mu.Lock() cb.mu.RLock()
defer c.mu.Unlock() defer cb.mu.RUnlock()
if _, ok := c.state[id]; !ok { if s, ok := cb.state[id]; ok {
c.state[id] = state{} return s.breakTimestamp, true
} }
s := c.state[id] return time.Time{}, false
defer func() { }
c.state[id] = s
}()
if time.Since(s.breakTimestamp) < c.breakDuration { func (cb *circuitBreaker) openBreak(id uint64) {
cb.mu.Lock()
defer cb.mu.Unlock()
delete(cb.state, id)
}
func (cb *circuitBreaker) incError(id uint64, doTime time.Time) {
cb.mu.Lock()
defer cb.mu.Unlock()
s := cb.state[id]
s.counter++
if s.counter >= cb.threshold {
s.counter = cb.threshold
if s.breakTimestamp.Before(doTime) {
s.breakTimestamp = doTime
}
}
cb.state[id] = s
}
func (c *circuitBreaker) Do(id uint64, f func() error) error {
breakTime, ok := c.breakTime(id)
if ok && time.Since(breakTime) < c.breakDuration {
return ErrCBClosed return ErrCBClosed
} }
// Use this timestamp to update circuit breaker in case of an error.
// f() may be blocked for unpredictable duration, so concurrent calls
// may update time in 'incError' endlessly and circuit will never be open
doTime := time.Now()
err := f() err := f()
if err == nil { if err == nil {
s.counter = 0 c.openBreak(id)
return nil } else {
} c.incError(id, doTime)
s.counter++
if s.counter >= c.threshold {
s.counter = c.threshold
s.breakTimestamp = time.Now()
} }
return err return err

View file

@ -2,6 +2,7 @@ package tree
import ( import (
"errors" "errors"
"runtime"
"testing" "testing"
"time" "time"
@ -39,3 +40,29 @@ func TestCircuitBreaker(t *testing.T) {
// Immediate request should return circuit breaker error // Immediate request should return circuit breaker error
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed) require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
} }
func TestCircuitBreakerNoBlock(t *testing.T) {
remoteErr := errors.New("service is being synchronized")
funcDuration := 2 * time.Second
threshold := 100
cb := newCircuitBreaker(1*time.Minute, threshold)
slowFunc := func() error {
time.Sleep(funcDuration)
return remoteErr
}
for i := 0; i < threshold; i++ {
// run in multiple goroutines Do function and make sure it is not
go func() {
cb.Do(1, slowFunc)
}()
}
// wait for one slow func duration + some delta
time.Sleep(funcDuration + 100*time.Millisecond)
runtime.Gosched()
// expect that all goroutines were not blocked by mutex in circuit breaker
// therefore all functions are done and circuit is closed
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
}