diff --git a/pool/pool.go b/pool/pool.go index 502833d..8cfda26 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -106,7 +106,7 @@ type clientStatus interface { // overallErrorRate returns the number of all happened errors. overallErrorRate() uint64 // methodsStatus returns statistic for all used methods. - methodsStatus() []statusSnapshot + methodsStatus() []StatusSnapshot } // errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. @@ -122,7 +122,7 @@ type clientStatusMonitor struct { mu sync.RWMutex // protect counters currentErrorCount uint32 overallErrorCount uint64 - methods []*methodStatus + methods []*MethodStatus } // values for healthy status of clientStatusMonitor. @@ -141,19 +141,6 @@ const ( statusHealthy ) -// methodStatus provide statistic for specific method. -type methodStatus struct { - name string - mu sync.RWMutex // protect counters - statusSnapshot -} - -// statusSnapshot is statistic for specific method. -type statusSnapshot struct { - allTime uint64 - allRequests uint64 -} - // MethodIndex index of method in list of statuses in clientStatusMonitor. type MethodIndex int @@ -229,9 +216,9 @@ func (m MethodIndex) String() string { } func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { - methods := make([]*methodStatus, methodLast) + methods := make([]*MethodStatus, methodLast) for i := methodBalanceGet; i < methodLast; i++ { - methods[i] = &methodStatus{name: i.String()} + methods[i] = &MethodStatus{name: i.String()} } healthy := new(atomic.Uint32) @@ -246,19 +233,6 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint } } -func (m *methodStatus) snapshot() statusSnapshot { - m.mu.RLock() - defer m.mu.RUnlock() - return m.statusSnapshot -} - -func (m *methodStatus) incRequests(elapsed time.Duration) { - m.mu.Lock() - defer m.mu.Unlock() - m.allTime += uint64(elapsed) - m.allRequests++ -} - // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { clientMutex sync.RWMutex @@ -1177,10 +1151,10 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 { return c.overallErrorCount } -func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { - result := make([]statusSnapshot, len(c.methods)) +func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot { + result := make([]StatusSnapshot, len(c.methods)) for i, val := range c.methods { - result[i] = val.snapshot() + result[i] = val.Snapshot() } return result @@ -1188,7 +1162,7 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { methodStat := c.methods[method] - methodStat.incRequests(elapsed) + methodStat.IncRequests(elapsed) if c.prm.poolRequestInfoCallback != nil { c.prm.poolRequestInfoCallback(RequestInfo{ Address: c.prm.address, diff --git a/pool/statistic.go b/pool/statistic.go index 5f1cfad..5ae0f7b 100644 --- a/pool/statistic.go +++ b/pool/statistic.go @@ -2,6 +2,7 @@ package pool import ( "errors" + "sync" "time" ) @@ -46,7 +47,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) { // NodeStatistic is metrics of certain connections. type NodeStatistic struct { address string - methods []statusSnapshot + methods []StatusSnapshot overallErrors uint64 currentErrors uint32 } @@ -158,3 +159,48 @@ func (n NodeStatistic) averageTime(method MethodIndex) time.Duration { } return time.Duration(stat.allTime / stat.allRequests) } + +// MethodStatus provide statistic for specific method. +type MethodStatus struct { + name string + mu sync.RWMutex // protect counters + snapshot StatusSnapshot +} + +func NewMethodStatus(name string) *MethodStatus { + return &MethodStatus{name: name} +} + +func (m *MethodStatus) Snapshot() StatusSnapshot { + m.mu.RLock() + defer m.mu.RUnlock() + return m.snapshot +} + +func (m *MethodStatus) IncRequests(elapsed time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.snapshot.allTime += uint64(elapsed) + m.snapshot.allRequests++ +} + +func (m *MethodStatus) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.snapshot.allTime = 0 + m.snapshot.allRequests = 0 +} + +// StatusSnapshot is statistic for specific method. +type StatusSnapshot struct { + allTime uint64 + allRequests uint64 +} + +func (s StatusSnapshot) AllRequests() uint64 { + return s.allRequests +} + +func (s StatusSnapshot) AllTime() uint64 { + return s.allTime +} diff --git a/pool/tree/pool.go b/pool/tree/pool.go index b610909..195102b 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -88,6 +88,7 @@ type Pool struct { rebalanceParams rebalanceParameters dialOptions []grpc.DialOption logger *zap.Logger + methods []*pool.MethodStatus maxRequestAttempts int @@ -169,6 +170,39 @@ type RemoveNodeParams struct { BearerToken []byte } +// MethodIndex index of method in list of statuses in Pool. +type MethodIndex int + +const ( + methodGetNodes MethodIndex = iota + methodGetSubTree + methodAddNode + methodAddNodeByPath + methodMoveNode + methodRemoveNode + methodLast +) + +// String implements fmt.Stringer. +func (m MethodIndex) String() string { + switch m { + case methodGetNodes: + return "getNodes" + case methodAddNode: + return "addNode" + case methodGetSubTree: + return "getSubTree" + case methodAddNodeByPath: + return "addNodeByPath" + case methodMoveNode: + return "moveNode" + case methodRemoveNode: + return "removeNode" + default: + return "unknown" + } +} + // NewPool creates connection pool using parameters. func NewPool(options InitParameters) (*Pool, error) { if options.key == nil { @@ -182,6 +216,11 @@ func NewPool(options InitParameters) (*Pool, error) { fillDefaultInitParams(&options) + methods := make([]*pool.MethodStatus, methodLast) + for i := methodGetNodes; i < methodLast; i++ { + methods[i] = pool.NewMethodStatus(i.String()) + } + p := &Pool{ key: options.key, logger: options.logger, @@ -192,6 +231,7 @@ func NewPool(options InitParameters) (*Pool, error) { clientRebalanceInterval: options.clientRebalanceInterval, }, maxRequestAttempts: options.maxRequestAttempts, + methods: methods, } return p, nil @@ -308,12 +348,13 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService }, } + start := time.Now() if err := p.signRequest(request); err != nil { return nil, err } var resp *grpcService.GetNodeByPathResponse - if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.GetNodeByPath(ctx, request) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. @@ -323,7 +364,9 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService return errNodeEmptyResult } return handleError("failed to get node by path", inErr) - }); err != nil && !errors.Is(err, errNodeEmptyResult) { + }) + p.methods[methodGetNodes].IncRequests(time.Since(start)) + if err != nil && !errors.Is(err, errNodeEmptyResult) { return nil, err } @@ -405,15 +448,18 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None } + start := time.Now() if err := p.signRequest(request); err != nil { return nil, err } var cli grpcService.TreeService_GetSubTreeClient - if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { cli, inErr = client.GetSubTree(ctx, request) return handleError("failed to get sub tree client", inErr) - }); err != nil { + }) + p.methods[methodGetSubTree].IncRequests(time.Since(start)) + if err != nil { return nil, err } @@ -435,15 +481,19 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { BearerToken: prm.BearerToken, }, } + + start := time.Now() if err := p.signRequest(request); err != nil { return 0, err } var resp *grpcService.AddResponse - if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.Add(ctx, request) return handleError("failed to add node", inErr) - }); err != nil { + }) + p.methods[methodAddNode].IncRequests(time.Since(start)) + if err != nil { return 0, err } @@ -467,15 +517,18 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint }, } + start := time.Now() if err := p.signRequest(request); err != nil { return 0, err } var resp *grpcService.AddByPathResponse - if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.AddByPath(ctx, request) return handleError("failed to add node by path", inErr) - }); err != nil { + }) + p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) + if err != nil { return 0, err } @@ -507,16 +560,20 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { }, } + start := time.Now() if err := p.signRequest(request); err != nil { return err } - return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { if _, err := client.Move(ctx, request); err != nil { return handleError("failed to move node", err) } return nil }) + p.methods[methodMoveNode].IncRequests(time.Since(start)) + + return err } // RemoveNode invokes eponymous method from TreeServiceClient. @@ -533,16 +590,21 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { BearerToken: prm.BearerToken, }, } + + start := time.Now() if err := p.signRequest(request); err != nil { return err } - return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { if _, err := client.Remove(ctx, request); err != nil { return handleError("failed to remove node", err) } return nil }) + p.methods[methodRemoveNode].IncRequests(time.Since(start)) + + return err } // Close closes the Pool and releases all the associated resources. @@ -563,6 +625,18 @@ func (p *Pool) Close() error { return err } +// Statistic returns tree pool statistics. +func (p *Pool) Statistic() Statistic { + stat := Statistic{make([]pool.StatusSnapshot, len(p.methods))} + + for i, method := range p.methods { + stat.methods[i] = method.Snapshot() + method.Reset() + } + + return stat +} + func handleError(msg string, err error) error { if err == nil { return nil diff --git a/pool/tree/statistic.go b/pool/tree/statistic.go new file mode 100644 index 0000000..9f565fa --- /dev/null +++ b/pool/tree/statistic.go @@ -0,0 +1,51 @@ +package tree + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" +) + +// Statistic is metrics of the tree pool. +type Statistic struct { + methods []pool.StatusSnapshot +} + +func (s *Statistic) Requests() (requests uint64) { + for _, val := range s.methods { + requests += val.AllRequests() + } + return requests +} + +func (s *Statistic) AverageGetNodes() time.Duration { + return s.averageTime(methodGetNodes) +} + +func (s *Statistic) AverageGetSubTree() time.Duration { + return s.averageTime(methodGetSubTree) +} + +func (s *Statistic) AverageAddNode() time.Duration { + return s.averageTime(methodAddNode) +} + +func (s *Statistic) AverageAddNodeByPath() time.Duration { + return s.averageTime(methodAddNodeByPath) +} + +func (s *Statistic) AverageMoveNode() time.Duration { + return s.averageTime(methodMoveNode) +} + +func (s *Statistic) AverageRemoveNode() time.Duration { + return s.averageTime(methodRemoveNode) +} + +func (s *Statistic) averageTime(method MethodIndex) time.Duration { + stat := s.methods[method] + if stat.AllRequests() == 0 { + return 0 + } + return time.Duration(stat.AllTime() / stat.AllRequests()) +}