From 423804de84ad72f243c788ca931795c912074f70 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 13 Jul 2022 12:51:43 +0300 Subject: [PATCH] [#283] pool: Add latency calculating Signed-off-by: Denis Kirillov --- pool/pool.go | 127 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 102 insertions(+), 25 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index be763cdd..080bc953 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -58,15 +58,20 @@ type clientStatus interface { isHealthy() bool setHealthy(bool) bool address() string - errorRate() uint32 + currentErrorRate() uint32 + overallErrorRate() uint64 resetErrorCounter() + latency() time.Duration } type clientStatusMonitor struct { - addr string - healthy *atomic.Bool - errorCount *atomic.Uint32 - errorThreshold uint32 + addr string + healthy *atomic.Bool + currentErrorCount *atomic.Uint32 + overallErrorCount *atomic.Uint64 + errorThreshold uint32 + allTime *atomic.Uint64 + allRequests *atomic.Uint64 } // clientWrapper is used by default, alternative implementations are intended for testing purposes only. @@ -113,10 +118,13 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) { res := &clientWrapper{ key: prm.key, clientStatusMonitor: &clientStatusMonitor{ - addr: prm.address, - healthy: atomic.NewBool(true), - errorCount: atomic.NewUint32(0), - errorThreshold: prm.errorThreshold, + addr: prm.address, + healthy: atomic.NewBool(true), + currentErrorCount: atomic.NewUint32(0), + overallErrorCount: atomic.NewUint64(0), + errorThreshold: prm.errorThreshold, + allTime: atomic.NewUint64(0), + allRequests: atomic.NewUint64(0), }, } @@ -138,15 +146,20 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc var cliPrm sdkClient.PrmBalanceGet cliPrm.SetAccount(prm.account) + start := time.Now() res, err := c.client.BalanceGet(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("balance get on client: %w", err) } + return res.Amount(), nil } func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { + start := time.Now() res, err := c.client.ContainerPut(ctx, prm.prmClient) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("container put on client: %w", err) } @@ -167,13 +180,14 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) ( var cliPrm sdkClient.PrmContainerGet cliPrm.SetContainer(prm.cnrID) + start := time.Now() res, err := c.client.ContainerGet(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("container get on client: %w", err) } cnr := res.Container() - return &cnr, nil } @@ -181,7 +195,9 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) var cliPrm sdkClient.PrmContainerList cliPrm.SetAccount(prm.ownerID) + start := time.Now() res, err := c.client.ContainerList(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("container list on client: %w", err) } @@ -195,7 +211,9 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel cliPrm.WithinSession(prm.stoken) } + start := time.Now() res, err := c.client.ContainerDelete(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return fmt.Errorf("container delete on client: %w", err) } @@ -211,7 +229,9 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) var cliPrm sdkClient.PrmContainerEACL cliPrm.SetContainer(prm.cnrID) + start := time.Now() res, err := c.client.ContainerEACL(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("get eacl on client: %w", err) } @@ -226,7 +246,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe cliPrm.WithinSession(prm.session) } + start := time.Now() res, err := c.client.ContainerSetEACL(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return fmt.Errorf("set eacl on client: %w", err) } @@ -249,7 +271,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe } func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) { + start := time.Now() res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("endpoint info on client: %w", err) } @@ -257,7 +281,9 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*n } func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) { + start := time.Now() res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("network info on client: %w", err) } @@ -266,7 +292,9 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*net func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { var cliPrm sdkClient.PrmObjectPutInit + start := time.Now() wObj, err := c.client.ObjectPutInit(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init writing on API client: %w", err) } @@ -308,7 +336,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I for { n, err = prm.payload.Read(buf) if n > 0 { - if !wObj.WritePayloadChunk(buf[:n]) { + start = time.Now() + successWrite := wObj.WritePayloadChunk(buf[:n]) + c.incRequests(time.Since(start)) + if !successWrite { break } @@ -354,7 +385,10 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e if prm.key != nil { cliPrm.UseKey(*prm.key) } + + start := time.Now() res, err := c.client.ObjectDelete(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return fmt.Errorf("delete object on client: %w", err) } @@ -385,13 +419,19 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe rObj.UseKey(*prm.key) } - if !rObj.ReadHeader(&res.Header) { + start := time.Now() + successReadHeader := rObj.ReadHeader(&res.Header) + c.incRequests(time.Since(start)) + if !successReadHeader { rObjRes, err := rObj.Close() err = c.handleError(rObjRes.Status(), err) return nil, fmt.Errorf("read header: %w", err) } - res.Payload = (*objectReadCloser)(rObj) + res.Payload = &objectReadCloser{ + reader: rObj, + elapsedTimeCallback: c.incRequests, + } return &res, nil } @@ -415,7 +455,9 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj var obj object.Object + start := time.Now() res, err := c.client.ObjectHead(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("read object header via client: %w", err) } @@ -441,7 +483,9 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R cliPrm.WithBearerToken(*prm.btoken) } + start := time.Now() res, err := c.client.ObjectRangeInit(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init payload range reading on client: %w", err) } @@ -449,7 +493,10 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R res.UseKey(*prm.key) } - return &ResObjectRange{payload: res}, nil + return &ResObjectRange{ + payload: res, + elapsedTimeCallback: c.incRequests, + }, nil } func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { @@ -482,7 +529,9 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) cliPrm.SetExp(prm.exp) cliPrm.UseKey(prm.key) + start := time.Now() res, err := c.client.SessionCreate(ctx, cliPrm) + c.incRequests(time.Since(start)) if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("session creation on client: %w", err) } @@ -505,17 +554,35 @@ func (c *clientStatusMonitor) address() string { return c.addr } -func (c *clientStatusMonitor) errorRate() uint32 { - return c.errorCount.Load() +func (c *clientStatusMonitor) incErrorRate() { + c.currentErrorCount.Inc() + c.overallErrorCount.Inc() +} + +func (c *clientStatusMonitor) currentErrorRate() uint32 { + return c.currentErrorCount.Load() +} + +func (c *clientStatusMonitor) overallErrorRate() uint64 { + return c.overallErrorCount.Load() } func (c *clientStatusMonitor) resetErrorCounter() { - c.errorCount.Store(0) + c.currentErrorCount.Store(0) +} + +func (c *clientStatusMonitor) latency() time.Duration { + return time.Duration(c.allTime.Load() / c.allRequests.Load()) +} + +func (c *clientStatusMonitor) incRequests(elapsed time.Duration) { + c.allTime.Add(uint64(elapsed)) + c.allRequests.Inc() } func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { if err != nil { - c.errorCount.Inc() + c.incErrorRate() return err } @@ -524,8 +591,8 @@ func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error case apistatus.ServerInternal, *apistatus.ServerInternal, apistatus.WrongMagicNumber, *apistatus.WrongMagicNumber, apistatus.SignatureVerification, *apistatus.SignatureVerification: - c.errorCount.Inc() - if c.errorCount.Load() >= c.errorThreshold { + c.incErrorRate() + if c.currentErrorRate() >= c.errorThreshold { c.setHealthy(false) c.resetErrorCounter() } @@ -1549,16 +1616,22 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { }) } -type objectReadCloser sdkClient.ObjectReader +type objectReadCloser struct { + reader *sdkClient.ObjectReader + elapsedTimeCallback func(time.Duration) +} // Read implements io.Reader of the object payload. func (x *objectReadCloser) Read(p []byte) (int, error) { - return (*sdkClient.ObjectReader)(x).Read(p) + start := time.Now() + n, err := x.reader.Read(p) + x.elapsedTimeCallback(time.Since(start)) + return n, err } // Close implements io.Closer of the object payload. func (x *objectReadCloser) Close() error { - _, err := (*sdkClient.ObjectReader)(x).Close() + _, err := x.reader.Close() return err } @@ -1626,12 +1699,16 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec // Must be initialized using Pool.ObjectRange, any other // usage is unsafe. type ResObjectRange struct { - payload *sdkClient.ObjectRangeReader + payload *sdkClient.ObjectRangeReader + elapsedTimeCallback func(time.Duration) } // Read implements io.Reader of the object payload. func (x *ResObjectRange) Read(p []byte) (int, error) { - return x.payload.Read(p) + start := time.Now() + n, err := x.payload.Read(p) + x.elapsedTimeCallback(time.Since(start)) + return n, err } // Close ends reading the payload range and returns the result of the operation