[#244] pool/tree: Collect request duration statistic #246

Merged
dkirillov merged 1 commit from mbiryukova/frostfs-sdk-go:feature/tree_pool_stat into master 2024-08-05 06:22:53 +00:00
4 changed files with 190 additions and 45 deletions

View file

@ -106,7 +106,7 @@ type clientStatus interface {
// overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64
// methodsStatus returns statistic for all used methods.
methodsStatus() []statusSnapshot
methodsStatus() []StatusSnapshot
}
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
@ -122,7 +122,7 @@ type clientStatusMonitor struct {
mu sync.RWMutex // protect counters
currentErrorCount uint32
overallErrorCount uint64
methods []*methodStatus
methods []*MethodStatus
}
// values for healthy status of clientStatusMonitor.
@ -141,19 +141,6 @@ const (
statusHealthy
)
// methodStatus provide statistic for specific method.
type methodStatus struct {
name string
mu sync.RWMutex // protect counters
statusSnapshot
}
// statusSnapshot is statistic for specific method.
type statusSnapshot struct {
allTime uint64
allRequests uint64
}
// MethodIndex index of method in list of statuses in clientStatusMonitor.
type MethodIndex int
@ -229,9 +216,9 @@ func (m MethodIndex) String() string {
}
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*methodStatus, methodLast)
methods := make([]*MethodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ {
methods[i] = &methodStatus{name: i.String()}
methods[i] = &MethodStatus{name: i.String()}
}
healthy := new(atomic.Uint32)
@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
}
}
func (m *methodStatus) snapshot() statusSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return m.statusSnapshot
}
func (m *methodStatus) incRequests(elapsed time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.allTime += uint64(elapsed)
m.allRequests++
}
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct {
clientMutex sync.RWMutex
@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
return c.overallErrorCount
}
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
result := make([]statusSnapshot, len(c.methods))
func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
result := make([]StatusSnapshot, len(c.methods))
for i, val := range c.methods {
result[i] = val.snapshot()
result[i] = val.Snapshot()
}
return result
@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
methodStat := c.methods[method]
methodStat.incRequests(elapsed)
methodStat.IncRequests(elapsed)
if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address,

View file

@ -2,6 +2,7 @@ package pool
import (
"errors"
"sync"
"time"
)
@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
// NodeStatistic is metrics of certain connections.
type NodeStatistic struct {
address string
methods []statusSnapshot
methods []StatusSnapshot
overallErrors uint64
currentErrors uint32
}
@ -158,3 +159,48 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
}
return time.Duration(stat.allTime / stat.allRequests)
}
// MethodStatus provide statistic for specific method.
type MethodStatus struct {
name string
mu sync.RWMutex // protect counters
snapshot StatusSnapshot
}
func NewMethodStatus(name string) *MethodStatus {
return &MethodStatus{name: name}
}
func (m *MethodStatus) Snapshot() StatusSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return m.snapshot
}
func (m *MethodStatus) IncRequests(elapsed time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.snapshot.allTime += uint64(elapsed)
m.snapshot.allRequests++
}
func (m *MethodStatus) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
m.snapshot.allTime = 0
m.snapshot.allRequests = 0
}
// StatusSnapshot is statistic for specific method.
type StatusSnapshot struct {
allTime uint64
allRequests uint64
}
func (s StatusSnapshot) AllRequests() uint64 {
return s.allRequests
}
func (s StatusSnapshot) AllTime() uint64 {
return s.allTime
}

View file

@ -88,6 +88,7 @@ type Pool struct {
rebalanceParams rebalanceParameters
dialOptions []grpc.DialOption
logger *zap.Logger
methods []*pool.MethodStatus
maxRequestAttempts int
@ -169,6 +170,39 @@ type RemoveNodeParams struct {
BearerToken []byte
}
// MethodIndex index of method in list of statuses in Pool.
type MethodIndex int
const (
methodGetNodes MethodIndex = iota
methodGetSubTree
methodAddNode
methodAddNodeByPath
methodMoveNode
methodRemoveNode
methodLast
)
// String implements fmt.Stringer.
func (m MethodIndex) String() string {
switch m {
case methodGetNodes:
return "getNodes"
case methodAddNode:
return "addNode"
case methodGetSubTree:
return "getSubTree"
case methodAddNodeByPath:
return "addNodeByPath"
case methodMoveNode:
return "moveNode"
case methodRemoveNode:
return "removeNode"
default:
return "unknown"
}
}
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil {
@ -182,6 +216,11 @@ func NewPool(options InitParameters) (*Pool, error) {
fillDefaultInitParams(&options)
methods := make([]*pool.MethodStatus, methodLast)
for i := methodGetNodes; i < methodLast; i++ {
methods[i] = pool.NewMethodStatus(i.String())
}
p := &Pool{
key: options.key,
logger: options.logger,
@ -192,6 +231,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
methods: methods,
}
return p, nil
@ -308,12 +348,13 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
},
}
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
@ -323,7 +364,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
})
p.methods[methodGetNodes].IncRequests(time.Since(start))
if err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err
}
@ -405,15 +448,18 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
}
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err
}
@ -435,15 +481,19 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
BearerToken: prm.BearerToken,
},
}
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *grpcService.AddResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
})
p.methods[methodAddNode].IncRequests(time.Since(start))
if err != nil {
return 0, err
}
@ -467,15 +517,18 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
},
}
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
if err != nil {
return 0, err
}
@ -507,16 +560,20 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
},
}
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
})
p.methods[methodMoveNode].IncRequests(time.Since(start))
return err
}
// RemoveNode invokes eponymous method from TreeServiceClient.
@ -533,16 +590,21 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
BearerToken: prm.BearerToken,
},
}
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
p.methods[methodRemoveNode].IncRequests(time.Since(start))
return err
}
// Close closes the Pool and releases all the associated resources.
@ -563,6 +625,18 @@ func (p *Pool) Close() error {
return err
}
// Statistic returns tree pool statistics.
func (p *Pool) Statistic() Statistic {
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
for i, method := range p.methods {
stat.methods[i] = method.Snapshot()
method.Reset()
}
return stat
}
func handleError(msg string, err error) error {
if err == nil {
return nil

51
pool/tree/statistic.go Normal file
View file

@ -0,0 +1,51 @@
package tree
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
)
// Statistic is metrics of the tree pool.
type Statistic struct {
methods []pool.StatusSnapshot
}
func (s *Statistic) Requests() (requests uint64) {
for _, val := range s.methods {
requests += val.AllRequests()
}
return requests
}
func (s *Statistic) AverageGetNodes() time.Duration {
return s.averageTime(methodGetNodes)
}
func (s *Statistic) AverageGetSubTree() time.Duration {
return s.averageTime(methodGetSubTree)
}
func (s *Statistic) AverageAddNode() time.Duration {
return s.averageTime(methodAddNode)
}
func (s *Statistic) AverageAddNodeByPath() time.Duration {
return s.averageTime(methodAddNodeByPath)
}
func (s *Statistic) AverageMoveNode() time.Duration {
return s.averageTime(methodMoveNode)
}
func (s *Statistic) AverageRemoveNode() time.Duration {
return s.averageTime(methodRemoveNode)
}
func (s *Statistic) averageTime(method MethodIndex) time.Duration {
stat := s.methods[method]
if stat.AllRequests() == 0 {
return 0
}
return time.Duration(stat.AllTime() / stat.AllRequests())
}