[#244] pool/tree: Collect request duration statistic
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
9da46f566f
commit
58ef4a1014
4 changed files with 187 additions and 45 deletions
42
pool/pool.go
42
pool/pool.go
|
@ -106,7 +106,7 @@ type clientStatus interface {
|
|||
// overallErrorRate returns the number of all happened errors.
|
||||
overallErrorRate() uint64
|
||||
// methodsStatus returns statistic for all used methods.
|
||||
methodsStatus() []statusSnapshot
|
||||
methodsStatus() []StatusSnapshot
|
||||
}
|
||||
|
||||
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
|
||||
|
@ -122,7 +122,7 @@ type clientStatusMonitor struct {
|
|||
mu sync.RWMutex // protect counters
|
||||
currentErrorCount uint32
|
||||
overallErrorCount uint64
|
||||
methods []*methodStatus
|
||||
methods []*MethodStatus
|
||||
}
|
||||
|
||||
// values for healthy status of clientStatusMonitor.
|
||||
|
@ -141,19 +141,6 @@ const (
|
|||
statusHealthy
|
||||
)
|
||||
|
||||
// methodStatus provide statistic for specific method.
|
||||
type methodStatus struct {
|
||||
name string
|
||||
mu sync.RWMutex // protect counters
|
||||
statusSnapshot
|
||||
}
|
||||
|
||||
// statusSnapshot is statistic for specific method.
|
||||
type statusSnapshot struct {
|
||||
allTime uint64
|
||||
allRequests uint64
|
||||
}
|
||||
|
||||
// MethodIndex index of method in list of statuses in clientStatusMonitor.
|
||||
type MethodIndex int
|
||||
|
||||
|
@ -229,9 +216,9 @@ func (m MethodIndex) String() string {
|
|||
}
|
||||
|
||||
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
|
||||
methods := make([]*methodStatus, methodLast)
|
||||
methods := make([]*MethodStatus, methodLast)
|
||||
for i := methodBalanceGet; i < methodLast; i++ {
|
||||
methods[i] = &methodStatus{name: i.String()}
|
||||
methods[i] = &MethodStatus{name: i.String()}
|
||||
}
|
||||
|
||||
healthy := new(atomic.Uint32)
|
||||
|
@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
|||
}
|
||||
}
|
||||
|
||||
func (m *methodStatus) snapshot() statusSnapshot {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.statusSnapshot
|
||||
}
|
||||
|
||||
func (m *methodStatus) incRequests(elapsed time.Duration) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.allTime += uint64(elapsed)
|
||||
m.allRequests++
|
||||
}
|
||||
|
||||
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
||||
type clientWrapper struct {
|
||||
clientMutex sync.RWMutex
|
||||
|
@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
|
|||
return c.overallErrorCount
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
||||
result := make([]statusSnapshot, len(c.methods))
|
||||
func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
|
||||
result := make([]StatusSnapshot, len(c.methods))
|
||||
for i, val := range c.methods {
|
||||
result[i] = val.snapshot()
|
||||
result[i] = val.Snapshot()
|
||||
}
|
||||
|
||||
return result
|
||||
|
@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
|||
|
||||
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||
methodStat := c.methods[method]
|
||||
methodStat.incRequests(elapsed)
|
||||
methodStat.IncRequests(elapsed)
|
||||
if c.prm.poolRequestInfoCallback != nil {
|
||||
c.prm.poolRequestInfoCallback(RequestInfo{
|
||||
Address: c.prm.address,
|
||||
|
|
|
@ -2,6 +2,7 @@ package pool
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
|
|||
// NodeStatistic is metrics of certain connections.
|
||||
type NodeStatistic struct {
|
||||
address string
|
||||
methods []statusSnapshot
|
||||
methods []StatusSnapshot
|
||||
overallErrors uint64
|
||||
currentErrors uint32
|
||||
}
|
||||
|
@ -158,3 +159,41 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
|
|||
}
|
||||
return time.Duration(stat.allTime / stat.allRequests)
|
||||
}
|
||||
|
||||
// MethodStatus provide statistic for specific method.
|
||||
type MethodStatus struct {
|
||||
name string
|
||||
mu sync.RWMutex // protect counters
|
||||
StatusSnapshot
|
||||
}
|
||||
|
||||
func NewMethodStatus(name string) *MethodStatus {
|
||||
return &MethodStatus{name: name}
|
||||
}
|
||||
|
||||
func (m *MethodStatus) Snapshot() StatusSnapshot {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.StatusSnapshot
|
||||
}
|
||||
|
||||
func (m *MethodStatus) IncRequests(elapsed time.Duration) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.allTime += uint64(elapsed)
|
||||
m.allRequests++
|
||||
}
|
||||
|
||||
// StatusSnapshot is statistic for specific method.
|
||||
type StatusSnapshot struct {
|
||||
allTime uint64
|
||||
allRequests uint64
|
||||
}
|
||||
|
||||
func (s StatusSnapshot) AllRequests() uint64 {
|
||||
return s.allRequests
|
||||
}
|
||||
|
||||
func (s StatusSnapshot) AllTime() uint64 {
|
||||
return s.allTime
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ type Pool struct {
|
|||
rebalanceParams rebalanceParameters
|
||||
dialOptions []grpc.DialOption
|
||||
logger *zap.Logger
|
||||
methods []*pool.MethodStatus
|
||||
|
||||
maxRequestAttempts int
|
||||
|
||||
|
@ -169,6 +170,41 @@ type RemoveNodeParams struct {
|
|||
BearerToken []byte
|
||||
}
|
||||
|
||||
// MethodIndex index of method in list of statuses in Pool.
|
||||
type MethodIndex int
|
||||
|
||||
const (
|
||||
methodGetNodes MethodIndex = iota
|
||||
methodGetSubTree
|
||||
methodAddNode
|
||||
methodAddNodeByPath
|
||||
methodMoveNode
|
||||
methodRemoveNode
|
||||
methodLast
|
||||
)
|
||||
|
||||
// String implements fmt.Stringer.
|
||||
func (m MethodIndex) String() string {
|
||||
switch m {
|
||||
case methodGetNodes:
|
||||
return "getNodes"
|
||||
case methodAddNode:
|
||||
return "addNode"
|
||||
case methodGetSubTree:
|
||||
return "getSubTree"
|
||||
case methodAddNodeByPath:
|
||||
return "addNodeByPath"
|
||||
case methodMoveNode:
|
||||
return "moveNode"
|
||||
case methodRemoveNode:
|
||||
return "removeNode"
|
||||
case methodLast:
|
||||
return "it's a system name rather than a method"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// NewPool creates connection pool using parameters.
|
||||
func NewPool(options InitParameters) (*Pool, error) {
|
||||
if options.key == nil {
|
||||
|
@ -182,6 +218,11 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
|
||||
fillDefaultInitParams(&options)
|
||||
|
||||
methods := make([]*pool.MethodStatus, methodLast)
|
||||
for i := methodGetNodes; i < methodLast; i++ {
|
||||
methods[i] = pool.NewMethodStatus(i.String())
|
||||
}
|
||||
|
||||
p := &Pool{
|
||||
key: options.key,
|
||||
logger: options.logger,
|
||||
|
@ -192,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
},
|
||||
maxRequestAttempts: options.maxRequestAttempts,
|
||||
methods: methods,
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
@ -313,7 +355,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
}
|
||||
|
||||
var resp *grpcService.GetNodeByPathResponse
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
start := time.Now()
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||
// Empty result is expected due to delayed tree service sync.
|
||||
|
@ -323,7 +366,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
return errNodeEmptyResult
|
||||
}
|
||||
return handleError("failed to get node by path", inErr)
|
||||
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
|
||||
})
|
||||
p.incRequests(time.Since(start), methodGetNodes)
|
||||
if err != nil && !errors.Is(err, errNodeEmptyResult) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -410,10 +455,13 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
|||
}
|
||||
|
||||
var cli grpcService.TreeService_GetSubTreeClient
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
start := time.Now()
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
cli, inErr = client.GetSubTree(ctx, request)
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
}); err != nil {
|
||||
})
|
||||
p.incRequests(time.Since(start), methodGetSubTree)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -440,10 +488,13 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
}
|
||||
|
||||
var resp *grpcService.AddResponse
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
start := time.Now()
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.Add(ctx, request)
|
||||
return handleError("failed to add node", inErr)
|
||||
}); err != nil {
|
||||
})
|
||||
p.incRequests(time.Since(start), methodAddNode)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -472,10 +523,13 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
}
|
||||
|
||||
var resp *grpcService.AddByPathResponse
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
start := time.Now()
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.AddByPath(ctx, request)
|
||||
return handleError("failed to add node by path", inErr)
|
||||
}); err != nil {
|
||||
})
|
||||
p.incRequests(time.Since(start), methodAddNodeByPath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -511,12 +565,16 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
start := time.Now()
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if _, err := client.Move(ctx, request); err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
p.incRequests(time.Since(start), methodMoveNode)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// RemoveNode invokes eponymous method from TreeServiceClient.
|
||||
|
@ -537,12 +595,16 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
start := time.Now()
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if _, err := client.Remove(ctx, request); err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
p.incRequests(time.Since(start), methodRemoveNode)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the Pool and releases all the associated resources.
|
||||
|
@ -563,6 +625,17 @@ func (p *Pool) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Statistic returns tree pool statistics.
|
||||
func (p *Pool) Statistic() Statistic {
|
||||
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
|
||||
|
||||
for i, method := range p.methods {
|
||||
stat.methods[i] = method.Snapshot()
|
||||
}
|
||||
|
||||
return stat
|
||||
}
|
||||
|
||||
func handleError(msg string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
|
@ -775,6 +848,11 @@ LOOP:
|
|||
return finErr
|
||||
}
|
||||
|
||||
func (p *Pool) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||
methodStat := p.methods[method]
|
||||
methodStat.IncRequests(elapsed)
|
||||
}
|
||||
|
||||
func shouldTryAgain(err error) bool {
|
||||
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
||||
}
|
||||
|
|
51
pool/tree/statistic.go
Normal file
51
pool/tree/statistic.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
)
|
||||
|
||||
// Statistic is metrics of the tree pool.
|
||||
type Statistic struct {
|
||||
methods []pool.StatusSnapshot
|
||||
}
|
||||
|
||||
func (s *Statistic) Requests() (requests uint64) {
|
||||
for _, val := range s.methods {
|
||||
requests += val.AllRequests()
|
||||
}
|
||||
return requests
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageGetNodes() time.Duration {
|
||||
return s.averageTime(methodGetNodes)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageGetSubTree() time.Duration {
|
||||
return s.averageTime(methodGetSubTree)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageAddNode() time.Duration {
|
||||
return s.averageTime(methodAddNode)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageAddNodeByPath() time.Duration {
|
||||
return s.averageTime(methodAddNodeByPath)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageMoveNode() time.Duration {
|
||||
return s.averageTime(methodMoveNode)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageRemoveNode() time.Duration {
|
||||
return s.averageTime(methodRemoveNode)
|
||||
}
|
||||
|
||||
func (s *Statistic) averageTime(method MethodIndex) time.Duration {
|
||||
stat := s.methods[method]
|
||||
if stat.AllRequests() == 0 {
|
||||
return 0
|
||||
}
|
||||
return time.Duration(stat.AllTime() / stat.AllRequests())
|
||||
}
|
Loading…
Reference in a new issue