[#244] pool/tree: Collect request duration statistic
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
9da46f566f
commit
751f906dbf
4 changed files with 187 additions and 45 deletions
42
pool/pool.go
42
pool/pool.go
|
@ -106,7 +106,7 @@ type clientStatus interface {
|
||||||
// overallErrorRate returns the number of all happened errors.
|
// overallErrorRate returns the number of all happened errors.
|
||||||
overallErrorRate() uint64
|
overallErrorRate() uint64
|
||||||
// methodsStatus returns statistic for all used methods.
|
// methodsStatus returns statistic for all used methods.
|
||||||
methodsStatus() []statusSnapshot
|
methodsStatus() []StatusSnapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
|
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
|
||||||
|
@ -122,7 +122,7 @@ type clientStatusMonitor struct {
|
||||||
mu sync.RWMutex // protect counters
|
mu sync.RWMutex // protect counters
|
||||||
currentErrorCount uint32
|
currentErrorCount uint32
|
||||||
overallErrorCount uint64
|
overallErrorCount uint64
|
||||||
methods []*methodStatus
|
methods []*MethodStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// values for healthy status of clientStatusMonitor.
|
// values for healthy status of clientStatusMonitor.
|
||||||
|
@ -141,19 +141,6 @@ const (
|
||||||
statusHealthy
|
statusHealthy
|
||||||
)
|
)
|
||||||
|
|
||||||
// methodStatus provide statistic for specific method.
|
|
||||||
type methodStatus struct {
|
|
||||||
name string
|
|
||||||
mu sync.RWMutex // protect counters
|
|
||||||
statusSnapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
// statusSnapshot is statistic for specific method.
|
|
||||||
type statusSnapshot struct {
|
|
||||||
allTime uint64
|
|
||||||
allRequests uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// MethodIndex index of method in list of statuses in clientStatusMonitor.
|
// MethodIndex index of method in list of statuses in clientStatusMonitor.
|
||||||
type MethodIndex int
|
type MethodIndex int
|
||||||
|
|
||||||
|
@ -229,9 +216,9 @@ func (m MethodIndex) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
|
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
|
||||||
methods := make([]*methodStatus, methodLast)
|
methods := make([]*MethodStatus, methodLast)
|
||||||
for i := methodBalanceGet; i < methodLast; i++ {
|
for i := methodBalanceGet; i < methodLast; i++ {
|
||||||
methods[i] = &methodStatus{name: i.String()}
|
methods[i] = &MethodStatus{name: i.String()}
|
||||||
}
|
}
|
||||||
|
|
||||||
healthy := new(atomic.Uint32)
|
healthy := new(atomic.Uint32)
|
||||||
|
@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *methodStatus) snapshot() statusSnapshot {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
return m.statusSnapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *methodStatus) incRequests(elapsed time.Duration) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
m.allTime += uint64(elapsed)
|
|
||||||
m.allRequests++
|
|
||||||
}
|
|
||||||
|
|
||||||
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
||||||
type clientWrapper struct {
|
type clientWrapper struct {
|
||||||
clientMutex sync.RWMutex
|
clientMutex sync.RWMutex
|
||||||
|
@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
|
||||||
return c.overallErrorCount
|
return c.overallErrorCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
|
||||||
result := make([]statusSnapshot, len(c.methods))
|
result := make([]StatusSnapshot, len(c.methods))
|
||||||
for i, val := range c.methods {
|
for i, val := range c.methods {
|
||||||
result[i] = val.snapshot()
|
result[i] = val.Snapshot()
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
||||||
|
|
||||||
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||||
methodStat := c.methods[method]
|
methodStat := c.methods[method]
|
||||||
methodStat.incRequests(elapsed)
|
methodStat.IncRequests(elapsed)
|
||||||
if c.prm.poolRequestInfoCallback != nil {
|
if c.prm.poolRequestInfoCallback != nil {
|
||||||
c.prm.poolRequestInfoCallback(RequestInfo{
|
c.prm.poolRequestInfoCallback(RequestInfo{
|
||||||
Address: c.prm.address,
|
Address: c.prm.address,
|
||||||
|
|
|
@ -2,6 +2,7 @@ package pool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
|
||||||
// NodeStatistic is metrics of certain connections.
|
// NodeStatistic is metrics of certain connections.
|
||||||
type NodeStatistic struct {
|
type NodeStatistic struct {
|
||||||
address string
|
address string
|
||||||
methods []statusSnapshot
|
methods []StatusSnapshot
|
||||||
overallErrors uint64
|
overallErrors uint64
|
||||||
currentErrors uint32
|
currentErrors uint32
|
||||||
}
|
}
|
||||||
|
@ -158,3 +159,41 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
|
||||||
}
|
}
|
||||||
return time.Duration(stat.allTime / stat.allRequests)
|
return time.Duration(stat.allTime / stat.allRequests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MethodStatus provide statistic for specific method.
|
||||||
|
type MethodStatus struct {
|
||||||
|
name string
|
||||||
|
mu sync.RWMutex // protect counters
|
||||||
|
snapshot StatusSnapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMethodStatus(name string) *MethodStatus {
|
||||||
|
return &MethodStatus{name: name}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MethodStatus) Snapshot() StatusSnapshot {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return m.snapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MethodStatus) IncRequests(elapsed time.Duration) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.snapshot.allTime += uint64(elapsed)
|
||||||
|
m.snapshot.allRequests++
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatusSnapshot is statistic for specific method.
|
||||||
|
type StatusSnapshot struct {
|
||||||
|
allTime uint64
|
||||||
|
allRequests uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StatusSnapshot) AllRequests() uint64 {
|
||||||
|
return s.allRequests
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StatusSnapshot) AllTime() uint64 {
|
||||||
|
return s.allTime
|
||||||
|
}
|
||||||
|
|
|
@ -88,6 +88,7 @@ type Pool struct {
|
||||||
rebalanceParams rebalanceParameters
|
rebalanceParams rebalanceParameters
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
methods []*pool.MethodStatus
|
||||||
|
|
||||||
maxRequestAttempts int
|
maxRequestAttempts int
|
||||||
|
|
||||||
|
@ -169,6 +170,41 @@ type RemoveNodeParams struct {
|
||||||
BearerToken []byte
|
BearerToken []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MethodIndex index of method in list of statuses in Pool.
|
||||||
|
type MethodIndex int
|
||||||
|
|
||||||
|
const (
|
||||||
|
methodGetNodes MethodIndex = iota
|
||||||
|
methodGetSubTree
|
||||||
|
methodAddNode
|
||||||
|
methodAddNodeByPath
|
||||||
|
methodMoveNode
|
||||||
|
methodRemoveNode
|
||||||
|
methodLast
|
||||||
|
)
|
||||||
|
|
||||||
|
// String implements fmt.Stringer.
|
||||||
|
func (m MethodIndex) String() string {
|
||||||
|
switch m {
|
||||||
|
case methodGetNodes:
|
||||||
|
return "getNodes"
|
||||||
|
case methodAddNode:
|
||||||
|
return "addNode"
|
||||||
|
case methodGetSubTree:
|
||||||
|
return "getSubTree"
|
||||||
|
case methodAddNodeByPath:
|
||||||
|
return "addNodeByPath"
|
||||||
|
case methodMoveNode:
|
||||||
|
return "moveNode"
|
||||||
|
case methodRemoveNode:
|
||||||
|
return "removeNode"
|
||||||
|
case methodLast:
|
||||||
|
return "it's a system name rather than a method"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewPool creates connection pool using parameters.
|
// NewPool creates connection pool using parameters.
|
||||||
func NewPool(options InitParameters) (*Pool, error) {
|
func NewPool(options InitParameters) (*Pool, error) {
|
||||||
if options.key == nil {
|
if options.key == nil {
|
||||||
|
@ -182,6 +218,11 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
|
||||||
fillDefaultInitParams(&options)
|
fillDefaultInitParams(&options)
|
||||||
|
|
||||||
|
methods := make([]*pool.MethodStatus, methodLast)
|
||||||
|
for i := methodGetNodes; i < methodLast; i++ {
|
||||||
|
methods[i] = pool.NewMethodStatus(i.String())
|
||||||
|
}
|
||||||
|
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
key: options.key,
|
key: options.key,
|
||||||
logger: options.logger,
|
logger: options.logger,
|
||||||
|
@ -192,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||||
},
|
},
|
||||||
maxRequestAttempts: options.maxRequestAttempts,
|
maxRequestAttempts: options.maxRequestAttempts,
|
||||||
|
methods: methods,
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
|
@ -313,7 +355,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
start := time.Now()
|
||||||
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
|
@ -323,7 +366,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
return errNodeEmptyResult
|
return errNodeEmptyResult
|
||||||
}
|
}
|
||||||
return handleError("failed to get node by path", inErr)
|
return handleError("failed to get node by path", inErr)
|
||||||
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
|
})
|
||||||
|
p.incRequests(time.Since(start), methodGetNodes)
|
||||||
|
if err != nil && !errors.Is(err, errNodeEmptyResult) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,10 +455,13 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
var cli grpcService.TreeService_GetSubTreeClient
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
start := time.Now()
|
||||||
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
cli, inErr = client.GetSubTree(ctx, request)
|
cli, inErr = client.GetSubTree(ctx, request)
|
||||||
return handleError("failed to get sub tree client", inErr)
|
return handleError("failed to get sub tree client", inErr)
|
||||||
}); err != nil {
|
})
|
||||||
|
p.incRequests(time.Since(start), methodGetSubTree)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -440,10 +488,13 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
var resp *grpcService.AddResponse
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
start := time.Now()
|
||||||
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.Add(ctx, request)
|
resp, inErr = client.Add(ctx, request)
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
}); err != nil {
|
})
|
||||||
|
p.incRequests(time.Since(start), methodAddNode)
|
||||||
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,10 +523,13 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
var resp *grpcService.AddByPathResponse
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
start := time.Now()
|
||||||
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
resp, inErr = client.AddByPath(ctx, request)
|
resp, inErr = client.AddByPath(ctx, request)
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
}); err != nil {
|
})
|
||||||
|
p.incRequests(time.Since(start), methodAddNodeByPath)
|
||||||
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,12 +565,16 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
start := time.Now()
|
||||||
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Move(ctx, request); err != nil {
|
if _, err := client.Move(ctx, request); err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
p.incRequests(time.Since(start), methodMoveNode)
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveNode invokes eponymous method from TreeServiceClient.
|
// RemoveNode invokes eponymous method from TreeServiceClient.
|
||||||
|
@ -537,12 +595,16 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
start := time.Now()
|
||||||
|
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||||
if _, err := client.Remove(ctx, request); err != nil {
|
if _, err := client.Remove(ctx, request); err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
p.incRequests(time.Since(start), methodRemoveNode)
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the Pool and releases all the associated resources.
|
// Close closes the Pool and releases all the associated resources.
|
||||||
|
@ -563,6 +625,17 @@ func (p *Pool) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Statistic returns tree pool statistics.
|
||||||
|
func (p *Pool) Statistic() Statistic {
|
||||||
|
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
|
||||||
|
|
||||||
|
for i, method := range p.methods {
|
||||||
|
stat.methods[i] = method.Snapshot()
|
||||||
|
}
|
||||||
|
|
||||||
|
return stat
|
||||||
|
}
|
||||||
|
|
||||||
func handleError(msg string, err error) error {
|
func handleError(msg string, err error) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -775,6 +848,11 @@ LOOP:
|
||||||
return finErr
|
return finErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Pool) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||||
|
methodStat := p.methods[method]
|
||||||
|
methodStat.IncRequests(elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
func shouldTryAgain(err error) bool {
|
func shouldTryAgain(err error) bool {
|
||||||
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
||||||
}
|
}
|
||||||
|
|
51
pool/tree/statistic.go
Normal file
51
pool/tree/statistic.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Statistic is metrics of the tree pool.
|
||||||
|
type Statistic struct {
|
||||||
|
methods []pool.StatusSnapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) Requests() (requests uint64) {
|
||||||
|
for _, val := range s.methods {
|
||||||
|
requests += val.AllRequests()
|
||||||
|
}
|
||||||
|
return requests
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) AverageGetNodes() time.Duration {
|
||||||
|
return s.averageTime(methodGetNodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) AverageGetSubTree() time.Duration {
|
||||||
|
return s.averageTime(methodGetSubTree)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) AverageAddNode() time.Duration {
|
||||||
|
return s.averageTime(methodAddNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) AverageAddNodeByPath() time.Duration {
|
||||||
|
return s.averageTime(methodAddNodeByPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) AverageMoveNode() time.Duration {
|
||||||
|
return s.averageTime(methodMoveNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) AverageRemoveNode() time.Duration {
|
||||||
|
return s.averageTime(methodRemoveNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistic) averageTime(method MethodIndex) time.Duration {
|
||||||
|
stat := s.methods[method]
|
||||||
|
if stat.AllRequests() == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return time.Duration(stat.AllTime() / stat.AllRequests())
|
||||||
|
}
|
Loading…
Reference in a new issue