[#244] pool/tree: Collect request duration statistic #246

Merged
dkirillov merged 1 commit from mbiryukova/frostfs-sdk-go:feature/tree_pool_stat into master 2024-08-05 06:22:53 +00:00
4 changed files with 190 additions and 45 deletions

View file

@ -106,7 +106,7 @@ type clientStatus interface {
// overallErrorRate returns the number of all happened errors. // overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64 overallErrorRate() uint64
// methodsStatus returns statistic for all used methods. // methodsStatus returns statistic for all used methods.
methodsStatus() []statusSnapshot methodsStatus() []StatusSnapshot
} }
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. // errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
@ -122,7 +122,7 @@ type clientStatusMonitor struct {
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
currentErrorCount uint32 currentErrorCount uint32
overallErrorCount uint64 overallErrorCount uint64
methods []*methodStatus methods []*MethodStatus
} }
// values for healthy status of clientStatusMonitor. // values for healthy status of clientStatusMonitor.
@ -141,19 +141,6 @@ const (
statusHealthy statusHealthy
) )
// methodStatus provide statistic for specific method.
type methodStatus struct {
name string
mu sync.RWMutex // protect counters
statusSnapshot
}
// statusSnapshot is statistic for specific method.
type statusSnapshot struct {
allTime uint64
allRequests uint64
}
// MethodIndex index of method in list of statuses in clientStatusMonitor. // MethodIndex index of method in list of statuses in clientStatusMonitor.
type MethodIndex int type MethodIndex int
@ -229,9 +216,9 @@ func (m MethodIndex) String() string {
} }
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*methodStatus, methodLast) methods := make([]*MethodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ { for i := methodBalanceGet; i < methodLast; i++ {
methods[i] = &methodStatus{name: i.String()} methods[i] = &MethodStatus{name: i.String()}
} }
healthy := new(atomic.Uint32) healthy := new(atomic.Uint32)
@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
} }
} }
func (m *methodStatus) snapshot() statusSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return m.statusSnapshot
}
func (m *methodStatus) incRequests(elapsed time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.allTime += uint64(elapsed)
m.allRequests++
}
// clientWrapper is used by default, alternative implementations are intended for testing purposes only. // clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct { type clientWrapper struct {
clientMutex sync.RWMutex clientMutex sync.RWMutex
@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
return c.overallErrorCount return c.overallErrorCount
} }
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
result := make([]statusSnapshot, len(c.methods)) result := make([]StatusSnapshot, len(c.methods))
for i, val := range c.methods { for i, val := range c.methods {
result[i] = val.snapshot() result[i] = val.Snapshot()
} }
return result return result
@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
methodStat := c.methods[method] methodStat := c.methods[method]
methodStat.incRequests(elapsed) methodStat.IncRequests(elapsed)
if c.prm.poolRequestInfoCallback != nil { if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{ c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address, Address: c.prm.address,

View file

@ -2,6 +2,7 @@ package pool
import ( import (
"errors" "errors"
"sync"
"time" "time"
) )
@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
// NodeStatistic is metrics of certain connections. // NodeStatistic is metrics of certain connections.
type NodeStatistic struct { type NodeStatistic struct {
address string address string
methods []statusSnapshot methods []StatusSnapshot
overallErrors uint64 overallErrors uint64
currentErrors uint32 currentErrors uint32
} }
@ -158,3 +159,48 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
} }
return time.Duration(stat.allTime / stat.allRequests) return time.Duration(stat.allTime / stat.allRequests)
} }
// MethodStatus provide statistic for specific method.
type MethodStatus struct {
name string
mu sync.RWMutex // protect counters
snapshot StatusSnapshot
}
func NewMethodStatus(name string) *MethodStatus {
return &MethodStatus{name: name}
}
func (m *MethodStatus) Snapshot() StatusSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return m.snapshot
}
func (m *MethodStatus) IncRequests(elapsed time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.snapshot.allTime += uint64(elapsed)
m.snapshot.allRequests++
}
func (m *MethodStatus) Reset() {
dkirillov marked this conversation as resolved Outdated

Why do we need this? Also if we really need this, why do use this only in tree pool?

Why do we need this? Also if we really need this, why do use this only in tree pool?

We need this to return statistic collected between requests, not over all time. For pool we want to do the same, but not in this PR

We need this to return statistic collected between requests, not over all time. For pool we want to do the same, but not in this PR

As we discussed with @a.bogatyrev, cumulative metric is not quite representative, especially in a long run. If delay spike happens, cumulative metric changes very slowly or may not change at all. We want to see reactive change in the metric, therefore we are going to change it for both tree and object metric.

As we discussed with @a.bogatyrev, cumulative metric is not quite representative, especially in a long run. If delay spike happens, cumulative metric changes very slowly or may not change at all. We want to see reactive change in the metric, therefore we are going to change it for both tree and object metric.
m.mu.Lock()
defer m.mu.Unlock()
m.snapshot.allTime = 0
m.snapshot.allRequests = 0
}
// StatusSnapshot is statistic for specific method.
type StatusSnapshot struct {
allTime uint64
allRequests uint64
}
func (s StatusSnapshot) AllRequests() uint64 {
return s.allRequests
}
func (s StatusSnapshot) AllTime() uint64 {
return s.allTime
}

View file

@ -88,6 +88,7 @@ type Pool struct {
rebalanceParams rebalanceParameters rebalanceParams rebalanceParameters
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
logger *zap.Logger logger *zap.Logger
methods []*pool.MethodStatus
maxRequestAttempts int maxRequestAttempts int
@ -169,6 +170,39 @@ type RemoveNodeParams struct {
BearerToken []byte BearerToken []byte
} }
// MethodIndex index of method in list of statuses in Pool.
type MethodIndex int
const (
methodGetNodes MethodIndex = iota
methodGetSubTree
methodAddNode
methodAddNodeByPath
methodMoveNode
methodRemoveNode
methodLast
)
// String implements fmt.Stringer.
func (m MethodIndex) String() string {
switch m {
case methodGetNodes:
return "getNodes"
case methodAddNode:
return "addNode"
case methodGetSubTree:
return "getSubTree"
case methodAddNodeByPath:
return "addNodeByPath"
case methodMoveNode:
return "moveNode"
case methodRemoveNode:
return "removeNode"
default:
return "unknown"
aarifullin marked this conversation as resolved Outdated

It will be a surprise if this accidentally is showed in a metric dashboard a a label :) we can just ignore this case and it'll be unknown

It will be a surprise if this accidentally is showed in a metric dashboard a a label :) we can just ignore this case and it'll be `unknown`
}
}
// NewPool creates connection pool using parameters. // NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) { func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil { if options.key == nil {
@ -182,6 +216,11 @@ func NewPool(options InitParameters) (*Pool, error) {
fillDefaultInitParams(&options) fillDefaultInitParams(&options)
methods := make([]*pool.MethodStatus, methodLast)
for i := methodGetNodes; i < methodLast; i++ {
methods[i] = pool.NewMethodStatus(i.String())
}
p := &Pool{ p := &Pool{
key: options.key, key: options.key,
logger: options.logger, logger: options.logger,
@ -192,6 +231,7 @@ func NewPool(options InitParameters) (*Pool, error) {
clientRebalanceInterval: options.clientRebalanceInterval, clientRebalanceInterval: options.clientRebalanceInterval,
}, },
maxRequestAttempts: options.maxRequestAttempts, maxRequestAttempts: options.maxRequestAttempts,
methods: methods,
} }
return p, nil return p, nil
@ -308,12 +348,13 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return nil, err return nil, err
} }
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
dkirillov marked this conversation as resolved Outdated

I'm not sure if we want to measure request with retry

I'm not sure if we want to measure request with retry

Those are different metrics, basically. In this PR we are interested in a combined time spent on request processing, because retries are expected and we would like to measure whole time pool takes to process request.

As for performance issue investigation, combined metric may seen a bit more useful, but we can add 'per-request' metric as well if it will be needed too.

Those are different metrics, basically. In this PR we are interested in a combined time spent on request processing, because retries are expected and we would like to measure whole time pool takes to process request. As for performance issue investigation, combined metric may seen a bit more useful, but we can add 'per-request' metric as well if it will be needed too.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
@ -323,7 +364,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
return errNodeEmptyResult return errNodeEmptyResult
} }
return handleError("failed to get node by path", inErr) return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) { })
p.methods[methodGetNodes].IncRequests(time.Since(start))
if err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err return nil, err
} }
@ -405,15 +448,18 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return nil, err return nil, err
} }
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { })
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err return nil, err
} }
@ -435,15 +481,19 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
BearerToken: prm.BearerToken, BearerToken: prm.BearerToken,
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return 0, err return 0, err
} }
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
dstepanov-yadro marked this conversation as resolved Outdated

What kind of duration do you measure?
grpc request duration can be measured with grpc middleware.
pool method duration must include the whole method body (with signRequest and request creation).

What kind of duration do you measure? grpc request duration can be measured with grpc middleware. pool method duration must include the whole method body (with `signRequest` and request creation).

We would like to have a balance between measuring transmission time (which is grpc request duration) but include all retries in it, therefore signRequest is not included and grpc middleware isn't used as well.

But I agree, seems like we can just calculate whole execution duration, so adding signRequest to a measure seems okay for me.

We would like to have a balance between measuring transmission time (which is grpc request duration) but include all retries in it, therefore `signRequest` is not included and grpc middleware isn't used as well. But I agree, seems like we can just calculate whole execution duration, so adding `signRequest` to a measure seems okay for me.
resp, inErr = client.Add(ctx, request) resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { })
p.methods[methodAddNode].IncRequests(time.Since(start))
if err != nil {
return 0, err return 0, err
} }
@ -467,15 +517,18 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return 0, err return 0, err
} }
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request) resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { })
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
if err != nil {
return 0, err return 0, err
} }
@ -507,16 +560,20 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return err return err
} }
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil { if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
}) })
p.methods[methodMoveNode].IncRequests(time.Since(start))
return err
} }
// RemoveNode invokes eponymous method from TreeServiceClient. // RemoveNode invokes eponymous method from TreeServiceClient.
@ -533,16 +590,21 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
BearerToken: prm.BearerToken, BearerToken: prm.BearerToken,
}, },
} }
start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return err return err
} }
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil { if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
}) })
p.methods[methodRemoveNode].IncRequests(time.Since(start))
return err
} }
// Close closes the Pool and releases all the associated resources. // Close closes the Pool and releases all the associated resources.
@ -563,6 +625,18 @@ func (p *Pool) Close() error {
return err return err
} }
// Statistic returns tree pool statistics.
func (p *Pool) Statistic() Statistic {
stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))}
for i, method := range p.methods {
stat.methods[i] = method.Snapshot()
method.Reset()
}
return stat
}
func handleError(msg string, err error) error { func handleError(msg string, err error) error {
if err == nil { if err == nil {
return nil return nil

51
pool/tree/statistic.go Normal file
View file

@ -0,0 +1,51 @@
package tree
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
)
// Statistic is metrics of the tree pool.
type Statistic struct {
methods []pool.StatusSnapshot
}
func (s *Statistic) Requests() (requests uint64) {
for _, val := range s.methods {
requests += val.AllRequests()
}
return requests
}
func (s *Statistic) AverageGetNodes() time.Duration {
return s.averageTime(methodGetNodes)
}
func (s *Statistic) AverageGetSubTree() time.Duration {
return s.averageTime(methodGetSubTree)
}
func (s *Statistic) AverageAddNode() time.Duration {
return s.averageTime(methodAddNode)
}
func (s *Statistic) AverageAddNodeByPath() time.Duration {
return s.averageTime(methodAddNodeByPath)
}
func (s *Statistic) AverageMoveNode() time.Duration {
return s.averageTime(methodMoveNode)
}
func (s *Statistic) AverageRemoveNode() time.Duration {
return s.averageTime(methodRemoveNode)
}
func (s *Statistic) averageTime(method MethodIndex) time.Duration {
stat := s.methods[method]
if stat.AllRequests() == 0 {
return 0
}
return time.Duration(stat.AllTime() / stat.AllRequests())
}