[#244] pool/tree: Collect request duration statistic #246
42
pool/pool.go
|
@ -106,7 +106,7 @@ type clientStatus interface {
|
|||
// overallErrorRate returns the number of all happened errors.
|
||||
overallErrorRate() uint64
|
||||
// methodsStatus returns statistic for all used methods.
|
||||
methodsStatus() []statusSnapshot
|
||||
methodsStatus() []StatusSnapshot
|
||||
}
|
||||
|
||||
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
|
||||
|
@ -122,7 +122,7 @@ type clientStatusMonitor struct {
|
|||
mu sync.RWMutex // protect counters
|
||||
currentErrorCount uint32
|
||||
overallErrorCount uint64
|
||||
methods []*methodStatus
|
||||
methods []*MethodStatus
|
||||
}
|
||||
|
||||
// values for healthy status of clientStatusMonitor.
|
||||
|
@ -141,19 +141,6 @@ const (
|
|||
statusHealthy
|
||||
)
|
||||
|
||||
// methodStatus provide statistic for specific method.
|
||||
type methodStatus struct {
|
||||
name string
|
||||
mu sync.RWMutex // protect counters
|
||||
statusSnapshot
|
||||
}
|
||||
|
||||
// statusSnapshot is statistic for specific method.
|
||||
type statusSnapshot struct {
|
||||
allTime uint64
|
||||
allRequests uint64
|
||||
}
|
||||
|
||||
// MethodIndex index of method in list of statuses in clientStatusMonitor.
|
||||
type MethodIndex int
|
||||
|
||||
|
@ -229,9 +216,9 @@ func (m MethodIndex) String() string {
|
|||
}
|
||||
|
||||
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
|
||||
methods := make([]*methodStatus, methodLast)
|
||||
methods := make([]*MethodStatus, methodLast)
|
||||
for i := methodBalanceGet; i < methodLast; i++ {
|
||||
methods[i] = &methodStatus{name: i.String()}
|
||||
methods[i] = &MethodStatus{name: i.String()}
|
||||
}
|
||||
|
||||
healthy := new(atomic.Uint32)
|
||||
|
@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
|||
}
|
||||
}
|
||||
|
||||
func (m *methodStatus) snapshot() statusSnapshot {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.statusSnapshot
|
||||
}
|
||||
|
||||
func (m *methodStatus) incRequests(elapsed time.Duration) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.allTime += uint64(elapsed)
|
||||
m.allRequests++
|
||||
}
|
||||
|
||||
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
||||
type clientWrapper struct {
|
||||
clientMutex sync.RWMutex
|
||||
|
@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
|
|||
return c.overallErrorCount
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
||||
result := make([]statusSnapshot, len(c.methods))
|
||||
func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
|
||||
result := make([]StatusSnapshot, len(c.methods))
|
||||
for i, val := range c.methods {
|
||||
result[i] = val.snapshot()
|
||||
result[i] = val.Snapshot()
|
||||
}
|
||||
|
||||
return result
|
||||
|
@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
|||
|
||||
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||
methodStat := c.methods[method]
|
||||
methodStat.incRequests(elapsed)
|
||||
methodStat.IncRequests(elapsed)
|
||||
if c.prm.poolRequestInfoCallback != nil {
|
||||
c.prm.poolRequestInfoCallback(RequestInfo{
|
||||
Address: c.prm.address,
|
||||
|
|
|
@ -2,6 +2,7 @@ package pool
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
|
|||
// NodeStatistic is metrics of certain connections.
|
||||
type NodeStatistic struct {
|
||||
address string
|
||||
methods []statusSnapshot
|
||||
methods []StatusSnapshot
|
||||
overallErrors uint64
|
||||
currentErrors uint32
|
||||
}
|
||||
|
@ -158,3 +159,48 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
|
|||
}
|
||||
return time.Duration(stat.allTime / stat.allRequests)
|
||||
}
|
||||
|
||||
// MethodStatus provide statistic for specific method.
|
||||
type MethodStatus struct {
|
||||
name string
|
||||
mu sync.RWMutex // protect counters
|
||||
snapshot StatusSnapshot
|
||||
}
|
||||
|
||||
func NewMethodStatus(name string) *MethodStatus {
|
||||
return &MethodStatus{name: name}
|
||||
}
|
||||
|
||||
func (m *MethodStatus) Snapshot() StatusSnapshot {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.snapshot
|
||||
}
|
||||
|
||||
func (m *MethodStatus) IncRequests(elapsed time.Duration) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.snapshot.allTime += uint64(elapsed)
|
||||
m.snapshot.allRequests++
|
||||
}
|
||||
|
||||
func (m *MethodStatus) Reset() {
|
||||
dkirillov marked this conversation as resolved
Outdated
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.snapshot.allTime = 0
|
||||
m.snapshot.allRequests = 0
|
||||
}
|
||||
|
||||
// StatusSnapshot is statistic for specific method.
|
||||
type StatusSnapshot struct {
|
||||
allTime uint64
|
||||
allRequests uint64
|
||||
}
|
||||
|
||||
func (s StatusSnapshot) AllRequests() uint64 {
|
||||
return s.allRequests
|
||||
}
|
||||
|
||||
func (s StatusSnapshot) AllTime() uint64 {
|
||||
return s.allTime
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ type Pool struct {
|
|||
rebalanceParams rebalanceParameters
|
||||
dialOptions []grpc.DialOption
|
||||
logger *zap.Logger
|
||||
methods []*pool.MethodStatus
|
||||
|
||||
maxRequestAttempts int
|
||||
|
||||
|
@ -169,6 +170,39 @@ type RemoveNodeParams struct {
|
|||
BearerToken []byte
|
||||
}
|
||||
|
||||
// MethodIndex index of method in list of statuses in Pool.
|
||||
type MethodIndex int
|
||||
|
||||
const (
|
||||
methodGetNodes MethodIndex = iota
|
||||
methodGetSubTree
|
||||
methodAddNode
|
||||
methodAddNodeByPath
|
||||
methodMoveNode
|
||||
methodRemoveNode
|
||||
methodLast
|
||||
)
|
||||
|
||||
// String implements fmt.Stringer.
|
||||
func (m MethodIndex) String() string {
|
||||
switch m {
|
||||
case methodGetNodes:
|
||||
return "getNodes"
|
||||
case methodAddNode:
|
||||
return "addNode"
|
||||
case methodGetSubTree:
|
||||
return "getSubTree"
|
||||
case methodAddNodeByPath:
|
||||
return "addNodeByPath"
|
||||
case methodMoveNode:
|
||||
return "moveNode"
|
||||
case methodRemoveNode:
|
||||
return "removeNode"
|
||||
default:
|
||||
return "unknown"
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
It will be a surprise if this accidentally is showed in a metric dashboard a a label :) we can just ignore this case and it'll be It will be a surprise if this accidentally is showed in a metric dashboard a a label :) we can just ignore this case and it'll be `unknown`
|
||||
}
|
||||
}
|
||||
|
||||
// NewPool creates connection pool using parameters.
|
||||
func NewPool(options InitParameters) (*Pool, error) {
|
||||
if options.key == nil {
|
||||
|
@ -182,6 +216,11 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
|
||||
fillDefaultInitParams(&options)
|
||||
|
||||
methods := make([]*pool.MethodStatus, methodLast)
|
||||
for i := methodGetNodes; i < methodLast; i++ {
|
||||
methods[i] = pool.NewMethodStatus(i.String())
|
||||
}
|
||||
|
||||
p := &Pool{
|
||||
key: options.key,
|
||||
logger: options.logger,
|
||||
|
@ -192,6 +231,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
},
|
||||
maxRequestAttempts: options.maxRequestAttempts,
|
||||
methods: methods,
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
@ -308,12 +348,13 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
},
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resp *grpcService.GetNodeByPathResponse
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
I'm not sure if we want to measure request with retry I'm not sure if we want to measure request with retry
alexvanin
commented
Those are different metrics, basically. In this PR we are interested in a combined time spent on request processing, because retries are expected and we would like to measure whole time pool takes to process request. As for performance issue investigation, combined metric may seen a bit more useful, but we can add 'per-request' metric as well if it will be needed too. Those are different metrics, basically. In this PR we are interested in a combined time spent on request processing, because retries are expected and we would like to measure whole time pool takes to process request.
As for performance issue investigation, combined metric may seen a bit more useful, but we can add 'per-request' metric as well if it will be needed too.
|
||||
// Empty result is expected due to delayed tree service sync.
|
||||
|
@ -323,7 +364,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
return errNodeEmptyResult
|
||||
}
|
||||
return handleError("failed to get node by path", inErr)
|
||||
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
|
||||
})
|
||||
p.methods[methodGetNodes].IncRequests(time.Since(start))
|
||||
if err != nil && !errors.Is(err, errNodeEmptyResult) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -405,15 +448,18 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
|||
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cli grpcService.TreeService_GetSubTreeClient
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
cli, inErr = client.GetSubTree(ctx, request)
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
}); err != nil {
|
||||
})
|
||||
p.methods[methodGetSubTree].IncRequests(time.Since(start))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -435,15 +481,19 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var resp *grpcService.AddResponse
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
What kind of duration do you measure? What kind of duration do you measure?
grpc request duration can be measured with grpc middleware.
pool method duration must include the whole method body (with `signRequest` and request creation).
alexvanin
commented
We would like to have a balance between measuring transmission time (which is grpc request duration) but include all retries in it, therefore But I agree, seems like we can just calculate whole execution duration, so adding We would like to have a balance between measuring transmission time (which is grpc request duration) but include all retries in it, therefore `signRequest` is not included and grpc middleware isn't used as well.
But I agree, seems like we can just calculate whole execution duration, so adding `signRequest` to a measure seems okay for me.
|
||||
resp, inErr = client.Add(ctx, request)
|
||||
return handleError("failed to add node", inErr)
|
||||
}); err != nil {
|
||||
})
|
||||
p.methods[methodAddNode].IncRequests(time.Since(start))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -467,15 +517,18 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
},
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var resp *grpcService.AddByPathResponse
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.AddByPath(ctx, request)
|
||||
return handleError("failed to add node by path", inErr)
|
||||
}); err != nil {
|
||||
})
|
||||
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -507,16 +560,20 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|||
},
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if _, err := client.Move(ctx, request); err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
p.methods[methodMoveNode].IncRequests(time.Since(start))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// RemoveNode invokes eponymous method from TreeServiceClient.
|
||||
|
@ -533,16 +590,21 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
|||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if _, err := client.Remove(ctx, request); err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
p.methods[methodRemoveNode].IncRequests(time.Since(start))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the Pool and releases all the associated resources.
|
||||
|
@ -563,6 +625,18 @@ func (p *Pool) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Statistic returns tree pool statistics.
|
||||
func (p *Pool) Statistic() Statistic {
|
||||
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
|
||||
|
||||
for i, method := range p.methods {
|
||||
stat.methods[i] = method.Snapshot()
|
||||
method.Reset()
|
||||
}
|
||||
|
||||
return stat
|
||||
}
|
||||
|
||||
func handleError(msg string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
|
|
51
pool/tree/statistic.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
)
|
||||
|
||||
// Statistic is metrics of the tree pool.
|
||||
type Statistic struct {
|
||||
methods []pool.StatusSnapshot
|
||||
}
|
||||
|
||||
func (s *Statistic) Requests() (requests uint64) {
|
||||
for _, val := range s.methods {
|
||||
requests += val.AllRequests()
|
||||
}
|
||||
return requests
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageGetNodes() time.Duration {
|
||||
return s.averageTime(methodGetNodes)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageGetSubTree() time.Duration {
|
||||
return s.averageTime(methodGetSubTree)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageAddNode() time.Duration {
|
||||
return s.averageTime(methodAddNode)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageAddNodeByPath() time.Duration {
|
||||
return s.averageTime(methodAddNodeByPath)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageMoveNode() time.Duration {
|
||||
return s.averageTime(methodMoveNode)
|
||||
}
|
||||
|
||||
func (s *Statistic) AverageRemoveNode() time.Duration {
|
||||
return s.averageTime(methodRemoveNode)
|
||||
}
|
||||
|
||||
func (s *Statistic) averageTime(method MethodIndex) time.Duration {
|
||||
stat := s.methods[method]
|
||||
if stat.AllRequests() == 0 {
|
||||
return 0
|
||||
}
|
||||
return time.Duration(stat.AllTime() / stat.AllRequests())
|
||||
}
|
Why do we need this? Also if we really need this, why do use this only in tree pool?
We need this to return statistic collected between requests, not over all time. For pool we want to do the same, but not in this PR
As we discussed with @a.bogatyrev, cumulative metric is not quite representative, especially in a long run. If delay spike happens, cumulative metric changes very slowly or may not change at all. We want to see reactive change in the metric, therefore we are going to change it for both tree and object metric.