[#244] pool/tree: Collect request duration statistic

After each request for tree pool statistic accumulated values are reset to zero.

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-31 10:59:00 +03:00
parent 9da46f566f
commit e83d6b7c6a
4 changed files with 190 additions and 45 deletions

View file

@ -106,7 +106,7 @@ type clientStatus interface {
// overallErrorRate returns the number of all happened errors. // overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64 overallErrorRate() uint64
// methodsStatus returns statistic for all used methods. // methodsStatus returns statistic for all used methods.
methodsStatus() []statusSnapshot methodsStatus() []StatusSnapshot
} }
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. // errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
@ -122,7 +122,7 @@ type clientStatusMonitor struct {
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
currentErrorCount uint32 currentErrorCount uint32
overallErrorCount uint64 overallErrorCount uint64
methods []*methodStatus methods []*MethodStatus
} }
// values for healthy status of clientStatusMonitor. // values for healthy status of clientStatusMonitor.
@ -141,19 +141,6 @@ const (
statusHealthy statusHealthy
) )
// methodStatus provide statistic for specific method.
type methodStatus struct {
name string
mu sync.RWMutex // protect counters
statusSnapshot
}
// statusSnapshot is statistic for specific method.
type statusSnapshot struct {
allTime uint64
allRequests uint64
}
// MethodIndex index of method in list of statuses in clientStatusMonitor. // MethodIndex index of method in list of statuses in clientStatusMonitor.
type MethodIndex int type MethodIndex int
@ -229,9 +216,9 @@ func (m MethodIndex) String() string {
} }
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*methodStatus, methodLast) methods := make([]*MethodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ { for i := methodBalanceGet; i < methodLast; i++ {
methods[i] = &methodStatus{name: i.String()} methods[i] = &MethodStatus{name: i.String()}
} }
healthy := new(atomic.Uint32) healthy := new(atomic.Uint32)
@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
} }
} }
func (m *methodStatus) snapshot() statusSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return m.statusSnapshot
}
func (m *methodStatus) incRequests(elapsed time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.allTime += uint64(elapsed)
m.allRequests++
}
// clientWrapper is used by default, alternative implementations are intended for testing purposes only. // clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct { type clientWrapper struct {
clientMutex sync.RWMutex clientMutex sync.RWMutex
@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
return c.overallErrorCount return c.overallErrorCount
} }
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
result := make([]statusSnapshot, len(c.methods)) result := make([]StatusSnapshot, len(c.methods))
for i, val := range c.methods { for i, val := range c.methods {
result[i] = val.snapshot() result[i] = val.Snapshot()
} }
return result return result
@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
methodStat := c.methods[method] methodStat := c.methods[method]
methodStat.incRequests(elapsed) methodStat.IncRequests(elapsed)
if c.prm.poolRequestInfoCallback != nil { if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{ c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address, Address: c.prm.address,

View file

@ -2,6 +2,7 @@ package pool
import ( import (
"errors" "errors"
"sync"
"time" "time"
) )
@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
// NodeStatistic is metrics of certain connections. // NodeStatistic is metrics of certain connections.
type NodeStatistic struct { type NodeStatistic struct {
address string address string
methods []statusSnapshot methods []StatusSnapshot
overallErrors uint64 overallErrors uint64
currentErrors uint32 currentErrors uint32
} }
@ -158,3 +159,48 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
} }
return time.Duration(stat.allTime / stat.allRequests) return time.Duration(stat.allTime / stat.allRequests)
} }
// MethodStatus provide statistic for specific method.
type MethodStatus struct {
name string
mu sync.RWMutex // protect counters
snapshot StatusSnapshot
}
func NewMethodStatus(name string) *MethodStatus {
return &MethodStatus{name: name}
}
func (m *MethodStatus) Snapshot() StatusSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return m.snapshot
}
func (m *MethodStatus) IncRequests(elapsed time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.snapshot.allTime += uint64(elapsed)
m.snapshot.allRequests++
}
func (m *MethodStatus) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
m.snapshot.allTime = 0
m.snapshot.allRequests = 0
}
// StatusSnapshot is statistic for specific method.
type StatusSnapshot struct {
allTime uint64
allRequests uint64
}
func (s StatusSnapshot) AllRequests() uint64 {
return s.allRequests
}
func (s StatusSnapshot) AllTime() uint64 {
return s.allTime
}

View file

@ -88,6 +88,7 @@ type Pool struct {
rebalanceParams rebalanceParameters rebalanceParams rebalanceParameters
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
logger *zap.Logger logger *zap.Logger
methods []*pool.MethodStatus
maxRequestAttempts int maxRequestAttempts int
@ -169,6 +170,39 @@ type RemoveNodeParams struct {
BearerToken []byte BearerToken []byte
} }
// MethodIndex index of method in list of statuses in Pool.
type MethodIndex int
const (
methodGetNodes MethodIndex = iota
methodGetSubTree
methodAddNode
methodAddNodeByPath
methodMoveNode
methodRemoveNode
methodLast
)
// String implements fmt.Stringer.
func (m MethodIndex) String() string {
switch m {
case methodGetNodes:
return "getNodes"
case methodAddNode:
return "addNode"
case methodGetSubTree:
return "getSubTree"
case methodAddNodeByPath:
return "addNodeByPath"
case methodMoveNode:
return "moveNode"
case methodRemoveNode:
return "removeNode"
default:
return "unknown"
}
}
// NewPool creates connection pool using parameters. // NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) { func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil { if options.key == nil {
@ -182,6 +216,11 @@ func NewPool(options InitParameters) (*Pool, error) {
fillDefaultInitParams(&options) fillDefaultInitParams(&options)
methods := make([]*pool.MethodStatus, methodLast)
for i := methodGetNodes; i < methodLast; i++ {
methods[i] = pool.NewMethodStatus(i.String())
}
p := &Pool{ p := &Pool{
key: options.key, key: options.key,
logger: options.logger, logger: options.logger,
@ -192,6 +231,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval, clientRebalanceInterval: options.clientRebalanceInterval,
}, },
maxRequestAttempts: options.maxRequestAttempts, maxRequestAttempts: options.maxRequestAttempts,
methods: methods,
} }
return p, nil return p, nil
@ -308,12 +348,13 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return nil, err return nil, err
} }
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
@ -323,7 +364,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
return errNodeEmptyResult return errNodeEmptyResult
} }
return handleError("failed to get node by path", inErr) return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) { })
p.methods[methodGetNodes].IncRequests(time.Since(start))
if err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err return nil, err
} }
@ -405,15 +448,18 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return nil, err return nil, err
} }
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { })
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err return nil, err
} }
@ -435,15 +481,19 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
BearerToken: prm.BearerToken, BearerToken: prm.BearerToken,
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return 0, err return 0, err
} }
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request) resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { })
p.methods[methodAddNode].IncRequests(time.Since(start))
if err != nil {
return 0, err return 0, err
} }
@ -467,15 +517,18 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return 0, err return 0, err
} }
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { })
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
if err != nil {
return 0, err return 0, err
} }
@ -507,16 +560,20 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return err return err
} }
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
}) })
p.methods[methodMoveNode].IncRequests(time.Since(start))
return err
} }
// RemoveNode invokes eponymous method from TreeServiceClient. // RemoveNode invokes eponymous method from TreeServiceClient.
@ -533,16 +590,21 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
BearerToken: prm.BearerToken, BearerToken: prm.BearerToken,
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return err return err
} }
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
}) })
p.methods[methodRemoveNode].IncRequests(time.Since(start))
return err
} }
// Close closes the Pool and releases all the associated resources. // Close closes the Pool and releases all the associated resources.
@ -563,6 +625,18 @@ func (p *Pool) Close() error {
return err return err
} }
// Statistic returns tree pool statistics.
func (p *Pool) Statistic() Statistic {
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
for i, method := range p.methods {
stat.methods[i] = method.Snapshot()
method.Reset()
}
return stat
}
func handleError(msg string, err error) error { func handleError(msg string, err error) error {
if err == nil { if err == nil {
return nil return nil

51
pool/tree/statistic.go Normal file
View file

@ -0,0 +1,51 @@
package tree
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
)
// Statistic is metrics of the tree pool.
type Statistic struct {
methods []pool.StatusSnapshot
}
func (s *Statistic) Requests() (requests uint64) {
for _, val := range s.methods {
requests += val.AllRequests()
}
return requests
}
func (s *Statistic) AverageGetNodes() time.Duration {
return s.averageTime(methodGetNodes)
}
func (s *Statistic) AverageGetSubTree() time.Duration {
return s.averageTime(methodGetSubTree)
}
func (s *Statistic) AverageAddNode() time.Duration {
return s.averageTime(methodAddNode)
}
func (s *Statistic) AverageAddNodeByPath() time.Duration {
return s.averageTime(methodAddNodeByPath)
}
func (s *Statistic) AverageMoveNode() time.Duration {
return s.averageTime(methodMoveNode)
}
func (s *Statistic) AverageRemoveNode() time.Duration {
return s.averageTime(methodRemoveNode)
}
func (s *Statistic) averageTime(method MethodIndex) time.Duration {
stat := s.methods[method]
if stat.AllRequests() == 0 {
return 0
}
return time.Duration(stat.AllTime() / stat.AllRequests())
}