From 1b30d228da0da00eda99756281dd62fcb8e9c92a Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 13 Jul 2022 12:55:38 +0300 Subject: [PATCH] [#283] pool: Expose statistic Signed-off-by: Denis Kirillov --- pool/pool.go | 21 +++++++++++++++ pool/pool_test.go | 30 ++++++++++++++++----- pool/statistic.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 pool/statistic.go diff --git a/pool/pool.go b/pool/pool.go index 080bc95..fa5d368 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1900,6 +1900,27 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci return cp.balanceGet(ctx, prm) } +// Statistic returns connection statistics. +func (p Pool) Statistic() Statistic { + stat := Statistic{} + for _, inner := range p.innerPools { + inner.lock.RLock() + for _, cl := range inner.clients { + node := &NodeStatistic{ + address: cl.address(), + latency: cl.latency(), + overallErrors: cl.overallErrorRate(), + currentErrors: cl.currentErrorRate(), + } + stat.nodes = append(stat.nodes, node) + stat.overallErrors += node.overallErrors + } + inner.lock.RUnlock() + } + + return stat +} + // waitForContainerPresence waits until the container is found on the NeoFS network. func waitForContainerPresence(ctx context.Context, cli client, cnrID *cid.ID, waitParams *WaitParams) error { var prm PrmContainerGet diff --git a/pool/pool_test.go b/pool/pool_test.go index 3b5a507..ec1053b 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -510,13 +510,31 @@ func TestWaitPresence(t *testing.T) { func newTestStatusMonitor(addr string) *clientStatusMonitor { return &clientStatusMonitor{ - addr: addr, - healthy: atomic.NewBool(true), - errorCount: atomic.NewUint32(0), - errorThreshold: 10, + addr: addr, + healthy: atomic.NewBool(true), + currentErrorCount: atomic.NewUint32(0), + overallErrorCount: atomic.NewUint64(0), + errorThreshold: 10, + allTime: atomic.NewUint64(0), + allRequests: atomic.NewUint64(0), } } +func TestStatusMonitor(t *testing.T) { + monitor := newTestStatusMonitor("") + + count := 10 + for i := 0; i < 10; i++ { + monitor.incErrorRate() + if i%3 == 0 { + monitor.resetErrorCounter() + } + } + + require.Equal(t, uint64(count), monitor.overallErrorRate()) + require.Equal(t, uint32(0), monitor.currentErrorRate()) +} + func TestHandleError(t *testing.T) { monitor := newTestStatusMonitor("") @@ -582,7 +600,7 @@ func TestHandleError(t *testing.T) { }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - errCount := monitor.errorRate() + errCount := monitor.currentErrorRate() err := monitor.handleError(tc.status, tc.err) if tc.expectedError { require.Error(t, err) @@ -592,7 +610,7 @@ func TestHandleError(t *testing.T) { if tc.countError { errCount++ } - require.Equal(t, errCount, monitor.errorRate()) + require.Equal(t, errCount, monitor.currentErrorRate()) }) } } diff --git a/pool/statistic.go b/pool/statistic.go new file mode 100644 index 0000000..f7256a8 --- /dev/null +++ b/pool/statistic.go @@ -0,0 +1,67 @@ +package pool + +import ( + "errors" + "time" +) + +// Statistic is metrics of the pool. +type Statistic struct { + overallErrors uint64 + nodes []*NodeStatistic +} + +// OverallErrors returns sum of errors on all connections. It doesn't decrease. +func (s Statistic) OverallErrors() uint64 { + return s.overallErrors +} + +// Nodes returns list of nodes statistic. +func (s Statistic) Nodes() []*NodeStatistic { + return s.nodes +} + +// ErrUnknownNode indicate that node with current address is not found in list. +var ErrUnknownNode = errors.New("unknown node") + +// Node returns NodeStatistic by node address. +// If such node doesn't exist ErrUnknownNode error is returned. +func (s Statistic) Node(address string) (*NodeStatistic, error) { + for i := range s.nodes { + if s.nodes[i].address == address { + return s.nodes[i], nil + } + } + + return nil, ErrUnknownNode +} + +// NodeStatistic is metrics of certain connections. +type NodeStatistic struct { + address string + latency time.Duration + overallErrors uint64 + currentErrors uint32 +} + +// OverallErrors returns all errors on current node. +// This value never decreases. +func (n NodeStatistic) OverallErrors() uint64 { + return n.overallErrors +} + +// CurrentErrors returns errors on current node. +// This value is always less than 'errorThreshold' from InitParameters. +func (n NodeStatistic) CurrentErrors() uint32 { + return n.currentErrors +} + +// Latency returns average latency for node request. +func (n NodeStatistic) Latency() time.Duration { + return n.latency +} + +// Address returns node endpoint address. +func (n NodeStatistic) Address() string { + return n.address +}