From b4f4ee4f79f4d11db016e5e555aca43cd5fcd057 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 19 Jul 2022 16:13:59 +0300 Subject: [PATCH] [#283] pool: Compute avg time for every method Signed-off-by: Denis Kirillov --- pool/pool.go | 105 +++++++++++++++++++++++++++++----------------- pool/statistic.go | 98 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 155 insertions(+), 48 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 88533f6..6d9d9c3 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -60,8 +60,7 @@ type clientStatus interface { address() string currentErrorRate() uint32 overallErrorRate() uint64 - latency() time.Duration - requests() uint64 + methodsStatus() map[string]methodStatus } type clientStatusMonitor struct { @@ -72,10 +71,33 @@ type clientStatusMonitor struct { mu sync.RWMutex // protect counters currentErrorCount uint32 overallErrorCount uint64 - allTime uint64 - allRequests uint64 + methods map[string]methodStatus } +type methodStatus struct { + name string + allTime uint64 + allRequests uint64 +} + +const ( + methodBalanceGet = "balanceGet" + methodContainerPut = "containerPut" + methodContainerGet = "containerGet" + methodContainerList = "containerList" + methodContainerDelete = "containerDelete" + methodContainerEACL = "containerEACL" + methodContainerSetEACL = "containerSetEACL" + methodEndpointInfo = "endpointInfo" + methodNetworkInfo = "networkInfo" + methodObjectPut = "objectPut" + methodObjectDelete = "objectDelete" + methodObjectGet = "objectGet" + methodObjectHead = "objectHead" + methodObjectRange = "objectRange" + methodSessionCreate = "sessionCreate" +) + // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { client sdkClient.Client @@ -145,7 +167,7 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc start := time.Now() res, err := c.client.BalanceGet(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodBalanceGet) var st apistatus.Status if res != nil { st = res.Status() @@ -160,7 +182,7 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { start := time.Now() res, err := c.client.ContainerPut(ctx, prm.prmClient) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodContainerPut) var st apistatus.Status if res != nil { st = res.Status() @@ -187,7 +209,7 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) ( start := time.Now() res, err := c.client.ContainerGet(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodContainerGet) var st apistatus.Status if res != nil { st = res.Status() @@ -206,7 +228,7 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) start := time.Now() res, err := c.client.ContainerList(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodContainerList) var st apistatus.Status if res != nil { st = res.Status() @@ -226,7 +248,7 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel start := time.Now() res, err := c.client.ContainerDelete(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodContainerDelete) var st apistatus.Status if res != nil { st = res.Status() @@ -248,7 +270,7 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) start := time.Now() res, err := c.client.ContainerEACL(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodContainerEACL) var st apistatus.Status if res != nil { st = res.Status() @@ -269,7 +291,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe start := time.Now() res, err := c.client.ContainerSetEACL(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodContainerSetEACL) var st apistatus.Status if res != nil { st = res.Status() @@ -298,7 +320,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) { start := time.Now() res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodEndpointInfo) var st apistatus.Status if res != nil { st = res.Status() @@ -312,7 +334,7 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*n func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) { start := time.Now() res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodNetworkInfo) var st apistatus.Status if res != nil { st = res.Status() @@ -327,7 +349,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I var cliPrm sdkClient.PrmObjectPutInit start := time.Now() wObj, err := c.client.ObjectPutInit(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init writing on API client: %w", err) } @@ -371,7 +393,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I if n > 0 { start = time.Now() successWrite := wObj.WritePayloadChunk(buf[:n]) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break } @@ -425,7 +447,7 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e start := time.Now() res, err := c.client.ObjectDelete(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodObjectDelete) var st apistatus.Status if res != nil { st = res.Status() @@ -462,7 +484,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe start := time.Now() successReadHeader := rObj.ReadHeader(&res.Header) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodObjectGet) if !successReadHeader { rObjRes, err := rObj.Close() var st apistatus.Status @@ -474,8 +496,10 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe } res.Payload = &objectReadCloser{ - reader: rObj, - elapsedTimeCallback: c.incRequests, + reader: rObj, + elapsedTimeCallback: func(elapsed time.Duration) { + c.incRequests(elapsed, methodObjectGet) + }, } return &res, nil @@ -502,7 +526,7 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj start := time.Now() res, err := c.client.ObjectHead(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodObjectHead) var st apistatus.Status if res != nil { st = res.Status() @@ -534,7 +558,7 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R start := time.Now() res, err := c.client.ObjectRangeInit(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodObjectRange) if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init payload range reading on client: %w", err) } @@ -543,8 +567,10 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R } return &ResObjectRange{ - payload: res, - elapsedTimeCallback: c.incRequests, + payload: res, + elapsedTimeCallback: func(elapsed time.Duration) { + c.incRequests(elapsed, methodObjectRange) + }, }, nil } @@ -580,7 +606,7 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) start := time.Now() res, err := c.client.SessionCreate(ctx, cliPrm) - c.incRequests(time.Since(start)) + c.incRequests(time.Since(start), methodSessionCreate) var st apistatus.Status if res != nil { st = res.Status() @@ -630,26 +656,28 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 { return c.overallErrorCount } -func (c *clientStatusMonitor) latency() time.Duration { +func (c *clientStatusMonitor) methodsStatus() map[string]methodStatus { c.mu.RLock() defer c.mu.RUnlock() - if c.allRequests == 0 { - return 0 + + result := make(map[string]methodStatus) + for key, val := range c.methods { + result[key] = val } - return time.Duration(c.allTime / c.allRequests) + + return result } -func (c *clientStatusMonitor) requests() uint64 { - c.mu.RLock() - defer c.mu.RUnlock() - return c.allRequests -} - -func (c *clientStatusMonitor) incRequests(elapsed time.Duration) { +func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method string) { c.mu.Lock() defer c.mu.Unlock() - c.allTime += uint64(elapsed) - c.allRequests++ + methodStat, ok := c.methods[method] + if !ok { + methodStat.name = method + } + methodStat.allTime += uint64(elapsed) + methodStat.allRequests++ + c.methods[method] = methodStat } func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { @@ -1976,8 +2004,7 @@ func (p Pool) Statistic() Statistic { for _, cl := range inner.clients { node := NodeStatistic{ address: cl.address(), - latency: cl.latency(), - requests: cl.requests(), + methods: cl.methodsStatus(), overallErrors: cl.overallErrorRate(), currentErrors: cl.currentErrorRate(), } diff --git a/pool/statistic.go b/pool/statistic.go index d8f3938..c80027e 100644 --- a/pool/statistic.go +++ b/pool/statistic.go @@ -39,8 +39,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) { // NodeStatistic is metrics of certain connections. type NodeStatistic struct { address string - latency time.Duration - requests uint64 + methods map[string]methodStatus overallErrors uint64 currentErrors uint32 } @@ -57,17 +56,98 @@ func (n NodeStatistic) CurrentErrors() uint32 { return n.currentErrors } -// Latency returns average latency for node request. -func (n NodeStatistic) Latency() time.Duration { - return n.latency -} - // Requests returns number of requests. -func (n NodeStatistic) Requests() uint64 { - return n.requests +func (n NodeStatistic) Requests() (requests uint64) { + for _, val := range n.methods { + requests += val.allRequests + } + return requests } // Address returns node endpoint address. func (n NodeStatistic) Address() string { return n.address } + +// AverageGetBalance returns average time to perform BalanceGet request. +func (n NodeStatistic) AverageGetBalance() time.Duration { + return n.averageTime(methodBalanceGet) +} + +// AveragePutContainer returns average time to perform ContainerPut request. +func (n NodeStatistic) AveragePutContainer() time.Duration { + return n.averageTime(methodContainerPut) +} + +// AverageGetContainer returns average time to perform ContainerGet request. +func (n NodeStatistic) AverageGetContainer() time.Duration { + return n.averageTime(methodContainerGet) +} + +// AverageListContainer returns average time to perform ContainerList request. +func (n NodeStatistic) AverageListContainer() time.Duration { + return n.averageTime(methodContainerList) +} + +// AverageDeleteContainer returns average time to perform ContainerDelete request. +func (n NodeStatistic) AverageDeleteContainer() time.Duration { + return n.averageTime(methodContainerDelete) +} + +// AverageGetContainerEACL returns average time to perform ContainerEACL request. +func (n NodeStatistic) AverageGetContainerEACL() time.Duration { + return n.averageTime(methodContainerEACL) +} + +// AverageSetContainerEACL returns average time to perform ContainerSetEACL request. +func (n NodeStatistic) AverageSetContainerEACL() time.Duration { + return n.averageTime(methodContainerSetEACL) +} + +// AverageEndpointInfo returns average time to perform EndpointInfo request. +func (n NodeStatistic) AverageEndpointInfo() time.Duration { + return n.averageTime(methodEndpointInfo) +} + +// AverageNetworkInfo returns average time to perform NetworkInfo request. +func (n NodeStatistic) AverageNetworkInfo() time.Duration { + return n.averageTime(methodNetworkInfo) +} + +// AveragePutObject returns average time to perform ObjectPut request. +func (n NodeStatistic) AveragePutObject() time.Duration { + return n.averageTime(methodObjectPut) +} + +// AverageDeleteObject returns average time to perform ObjectDelete request. +func (n NodeStatistic) AverageDeleteObject() time.Duration { + return n.averageTime(methodObjectDelete) +} + +// AverageGetObject returns average time to perform ObjectGet request. +func (n NodeStatistic) AverageGetObject() time.Duration { + return n.averageTime(methodObjectGet) +} + +// AverageHeadObject returns average time to perform ObjectHead request. +func (n NodeStatistic) AverageHeadObject() time.Duration { + return n.averageTime(methodObjectHead) +} + +// AverageRangeObject returns average time to perform ObjectRange request. +func (n NodeStatistic) AverageRangeObject() time.Duration { + return n.averageTime(methodObjectRange) +} + +// AverageCreateSession returns average time to perform SessionCreate request. +func (n NodeStatistic) AverageCreateSession() time.Duration { + return n.averageTime(methodSessionCreate) +} + +func (n NodeStatistic) averageTime(method string) time.Duration { + stat := n.methods[method] + if stat.allRequests == 0 { + return 0 + } + return time.Duration(stat.allTime / stat.allRequests) +}