[#283] pool: Change batch of atomics to mutex

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-18 11:42:29 +03:00 committed by fyrchik
parent 0d54757545
commit 54145916a9
4 changed files with 50 additions and 53 deletions

View file

@ -21,7 +21,7 @@ import (
type mockClient struct { type mockClient struct {
key ecdsa.PrivateKey key ecdsa.PrivateKey
*clientStatusMonitor clientStatusMonitor
errorOnCreateSession bool errorOnCreateSession bool
errorOnEndpointInfo bool errorOnEndpointInfo bool

View file

@ -60,7 +60,6 @@ type clientStatus interface {
address() string address() string
currentErrorRate() uint32 currentErrorRate() uint32
overallErrorRate() uint64 overallErrorRate() uint64
resetErrorCounter()
latency() time.Duration latency() time.Duration
requests() uint64 requests() uint64
} }
@ -68,18 +67,20 @@ type clientStatus interface {
type clientStatusMonitor struct { type clientStatusMonitor struct {
addr string addr string
healthy *atomic.Bool healthy *atomic.Bool
currentErrorCount *atomic.Uint32
overallErrorCount *atomic.Uint64
errorThreshold uint32 errorThreshold uint32
allTime *atomic.Uint64
allRequests *atomic.Uint64 mu sync.RWMutex // protect counters
currentErrorCount uint32
overallErrorCount uint64
allTime uint64
allRequests uint64
} }
// clientWrapper is used by default, alternative implementations are intended for testing purposes only. // clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct { type clientWrapper struct {
client sdkClient.Client client sdkClient.Client
key ecdsa.PrivateKey key ecdsa.PrivateKey
*clientStatusMonitor clientStatusMonitor
} }
type wrapperPrm struct { type wrapperPrm struct {
@ -112,20 +113,15 @@ func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo)
func newWrapper(prm wrapperPrm) (*clientWrapper, error) { func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
var prmInit sdkClient.PrmInit var prmInit sdkClient.PrmInit
//prmInit.ResolveNeoFSFailures()
prmInit.SetDefaultPrivateKey(prm.key) prmInit.SetDefaultPrivateKey(prm.key)
prmInit.SetResponseInfoCallback(prm.responseInfoCallback) prmInit.SetResponseInfoCallback(prm.responseInfoCallback)
res := &clientWrapper{ res := &clientWrapper{
key: prm.key, key: prm.key,
clientStatusMonitor: &clientStatusMonitor{ clientStatusMonitor: clientStatusMonitor{
addr: prm.address, addr: prm.address,
healthy: atomic.NewBool(true), healthy: atomic.NewBool(true),
currentErrorCount: atomic.NewUint32(0),
overallErrorCount: atomic.NewUint64(0),
errorThreshold: prm.errorThreshold, errorThreshold: prm.errorThreshold,
allTime: atomic.NewUint64(0),
allRequests: atomic.NewUint64(0),
}, },
} }
@ -612,41 +608,48 @@ func (c *clientStatusMonitor) address() string {
} }
func (c *clientStatusMonitor) incErrorRate() { func (c *clientStatusMonitor) incErrorRate() {
c.currentErrorCount.Inc() c.mu.Lock()
c.overallErrorCount.Inc() defer c.mu.Unlock()
if c.currentErrorCount.Load() >= c.errorThreshold { c.currentErrorCount++
c.overallErrorCount++
if c.currentErrorCount >= c.errorThreshold {
c.setHealthy(false) c.setHealthy(false)
c.resetErrorCounter() c.currentErrorCount = 0
} }
} }
func (c *clientStatusMonitor) currentErrorRate() uint32 { func (c *clientStatusMonitor) currentErrorRate() uint32 {
return c.currentErrorCount.Load() c.mu.RLock()
defer c.mu.RUnlock()
return c.currentErrorCount
} }
func (c *clientStatusMonitor) overallErrorRate() uint64 { func (c *clientStatusMonitor) overallErrorRate() uint64 {
return c.overallErrorCount.Load() c.mu.RLock()
} defer c.mu.RUnlock()
return c.overallErrorCount
func (c *clientStatusMonitor) resetErrorCounter() {
c.currentErrorCount.Store(0)
} }
func (c *clientStatusMonitor) latency() time.Duration { func (c *clientStatusMonitor) latency() time.Duration {
allRequests := c.requests() c.mu.RLock()
if allRequests == 0 { defer c.mu.RUnlock()
if c.allRequests == 0 {
return 0 return 0
} }
return time.Duration(c.allTime.Load() / allRequests) return time.Duration(c.allTime / c.allRequests)
} }
func (c *clientStatusMonitor) requests() uint64 { func (c *clientStatusMonitor) requests() uint64 {
return c.allRequests.Load() c.mu.RLock()
defer c.mu.RUnlock()
return c.allRequests
} }
func (c *clientStatusMonitor) incRequests(elapsed time.Duration) { func (c *clientStatusMonitor) incRequests(elapsed time.Duration) {
c.allTime.Add(uint64(elapsed)) c.mu.Lock()
c.allRequests.Inc() defer c.mu.Unlock()
c.allTime += uint64(elapsed)
c.allRequests++
} }
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
@ -1971,7 +1974,7 @@ func (p Pool) Statistic() Statistic {
for _, inner := range p.innerPools { for _, inner := range p.innerPools {
inner.lock.RLock() inner.lock.RLock()
for _, cl := range inner.clients { for _, cl := range inner.clients {
node := &NodeStatistic{ node := NodeStatistic{
address: cl.address(), address: cl.address(),
latency: cl.latency(), latency: cl.latency(),
requests: cl.requests(), requests: cl.requests(),

View file

@ -508,31 +508,25 @@ func TestWaitPresence(t *testing.T) {
}) })
} }
func newTestStatusMonitor(addr string) *clientStatusMonitor { func newTestStatusMonitor(addr string) clientStatusMonitor {
return &clientStatusMonitor{ return clientStatusMonitor{
addr: addr, addr: addr,
healthy: atomic.NewBool(true), healthy: atomic.NewBool(true),
currentErrorCount: atomic.NewUint32(0),
overallErrorCount: atomic.NewUint64(0),
errorThreshold: 10, errorThreshold: 10,
allTime: atomic.NewUint64(0),
allRequests: atomic.NewUint64(0),
} }
} }
func TestStatusMonitor(t *testing.T) { func TestStatusMonitor(t *testing.T) {
monitor := newTestStatusMonitor("") monitor := newTestStatusMonitor("")
monitor.errorThreshold = 3
count := 10 count := 10
for i := 0; i < 10; i++ { for i := 0; i < count; i++ {
monitor.incErrorRate() monitor.incErrorRate()
if i%3 == 0 {
monitor.resetErrorCounter()
}
} }
require.Equal(t, uint64(count), monitor.overallErrorRate()) require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(0), monitor.currentErrorRate()) require.Equal(t, uint32(1), monitor.currentErrorRate())
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {

View file

@ -8,7 +8,7 @@ import (
// Statistic is metrics of the pool. // Statistic is metrics of the pool.
type Statistic struct { type Statistic struct {
overallErrors uint64 overallErrors uint64
nodes []*NodeStatistic nodes []NodeStatistic
} }
// OverallErrors returns sum of errors on all connections. It doesn't decrease. // OverallErrors returns sum of errors on all connections. It doesn't decrease.
@ -17,7 +17,7 @@ func (s Statistic) OverallErrors() uint64 {
} }
// Nodes returns list of nodes statistic. // Nodes returns list of nodes statistic.
func (s Statistic) Nodes() []*NodeStatistic { func (s Statistic) Nodes() []NodeStatistic {
return s.nodes return s.nodes
} }
@ -29,7 +29,7 @@ var ErrUnknownNode = errors.New("unknown node")
func (s Statistic) Node(address string) (*NodeStatistic, error) { func (s Statistic) Node(address string) (*NodeStatistic, error) {
for i := range s.nodes { for i := range s.nodes {
if s.nodes[i].address == address { if s.nodes[i].address == address {
return s.nodes[i], nil return &s.nodes[i], nil
} }
} }