[#283] pool: Expose statistic

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-13 12:55:38 +03:00 committed by fyrchik
parent 423804de84
commit 1b30d228da
3 changed files with 112 additions and 6 deletions

View file

@ -1900,6 +1900,27 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci
return cp.balanceGet(ctx, prm)
}
// Statistic returns connection statistics.
func (p Pool) Statistic() Statistic {
stat := Statistic{}
for _, inner := range p.innerPools {
inner.lock.RLock()
for _, cl := range inner.clients {
node := &NodeStatistic{
address: cl.address(),
latency: cl.latency(),
overallErrors: cl.overallErrorRate(),
currentErrors: cl.currentErrorRate(),
}
stat.nodes = append(stat.nodes, node)
stat.overallErrors += node.overallErrors
}
inner.lock.RUnlock()
}
return stat
}
// waitForContainerPresence waits until the container is found on the NeoFS network.
func waitForContainerPresence(ctx context.Context, cli client, cnrID *cid.ID, waitParams *WaitParams) error {
var prm PrmContainerGet

View file

@ -512,11 +512,29 @@ func newTestStatusMonitor(addr string) *clientStatusMonitor {
return &clientStatusMonitor{
addr: addr,
healthy: atomic.NewBool(true),
errorCount: atomic.NewUint32(0),
currentErrorCount: atomic.NewUint32(0),
overallErrorCount: atomic.NewUint64(0),
errorThreshold: 10,
allTime: atomic.NewUint64(0),
allRequests: atomic.NewUint64(0),
}
}
func TestStatusMonitor(t *testing.T) {
monitor := newTestStatusMonitor("")
count := 10
for i := 0; i < 10; i++ {
monitor.incErrorRate()
if i%3 == 0 {
monitor.resetErrorCounter()
}
}
require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(0), monitor.currentErrorRate())
}
func TestHandleError(t *testing.T) {
monitor := newTestStatusMonitor("")
@ -582,7 +600,7 @@ func TestHandleError(t *testing.T) {
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
errCount := monitor.errorRate()
errCount := monitor.currentErrorRate()
err := monitor.handleError(tc.status, tc.err)
if tc.expectedError {
require.Error(t, err)
@ -592,7 +610,7 @@ func TestHandleError(t *testing.T) {
if tc.countError {
errCount++
}
require.Equal(t, errCount, monitor.errorRate())
require.Equal(t, errCount, monitor.currentErrorRate())
})
}
}

67
pool/statistic.go Normal file
View file

@ -0,0 +1,67 @@
package pool
import (
"errors"
"time"
)
// Statistic is metrics of the pool.
type Statistic struct {
overallErrors uint64
nodes []*NodeStatistic
}
// OverallErrors returns sum of errors on all connections. It doesn't decrease.
func (s Statistic) OverallErrors() uint64 {
return s.overallErrors
}
// Nodes returns list of nodes statistic.
func (s Statistic) Nodes() []*NodeStatistic {
return s.nodes
}
// ErrUnknownNode indicate that node with current address is not found in list.
var ErrUnknownNode = errors.New("unknown node")
// Node returns NodeStatistic by node address.
// If such node doesn't exist ErrUnknownNode error is returned.
func (s Statistic) Node(address string) (*NodeStatistic, error) {
for i := range s.nodes {
if s.nodes[i].address == address {
return s.nodes[i], nil
}
}
return nil, ErrUnknownNode
}
// NodeStatistic is metrics of certain connections.
type NodeStatistic struct {
address string
latency time.Duration
overallErrors uint64
currentErrors uint32
}
// OverallErrors returns all errors on current node.
// This value never decreases.
func (n NodeStatistic) OverallErrors() uint64 {
return n.overallErrors
}
// CurrentErrors returns errors on current node.
// This value is always less than 'errorThreshold' from InitParameters.
func (n NodeStatistic) CurrentErrors() uint32 {
return n.currentErrors
}
// Latency returns average latency for node request.
func (n NodeStatistic) Latency() time.Duration {
return n.latency
}
// Address returns node endpoint address.
func (n NodeStatistic) Address() string {
return n.address
}