From fe5b28e6bfb2d5414c62af911025686400d997ac Mon Sep 17 00:00:00 2001 From: Ekaterina Lebedeva Date: Thu, 6 Mar 2025 05:23:53 +0300 Subject: [PATCH] [#338] pool: Support avg request time for ListContainerStream Signed-off-by: Ekaterina Lebedeva --- pool/client.go | 30 +++++++++++++++++++++++++----- pool/statistic.go | 5 +++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pool/client.go b/pool/client.go index f1abbf23..78650723 100644 --- a/pool/client.go +++ b/pool/client.go @@ -90,6 +90,8 @@ func (m MethodIndex) String() string { return "containerGet" case methodContainerList: return "containerList" + case methodContainerListStream: + return "containerListStream" case methodContainerDelete: return "containerDelete" case methodEndpointInfo: @@ -457,13 +459,16 @@ type PrmListStream struct { // // Must be initialized using Pool.ListContainersStream, any other usage is unsafe. type ResListStream struct { - r *sdkClient.ContainerListReader - handleError func(context.Context, apistatus.Status, error) error + r *sdkClient.ContainerListReader + elapsedTimeCallback func(time.Duration) + handleError func(context.Context, apistatus.Status, error) error } // Read reads another list of the container identifiers. func (x *ResListStream) Read(buf []cid.ID) (int, error) { + start := time.Now() n, ok := x.r.Read(buf) + x.elapsedTimeCallback(time.Since(start)) if !ok { res, err := x.r.Close() if err == nil { @@ -487,7 +492,14 @@ func (x *ResListStream) Read(buf []cid.ID) (int, error) { // // Returns an error if container can't be read. func (x *ResListStream) Iterate(f func(cid.ID) bool) error { - return x.r.Iterate(f) + start := time.Now() + err := x.r.Iterate(func(id cid.ID) bool { + x.elapsedTimeCallback(time.Since(start)) + stop := f(id) + start = time.Now() + return stop + }) + return err } // Close ends reading list of the matched containers and returns the result of the operation @@ -508,11 +520,19 @@ func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStre Session: prm.Session, } - res, err := cl.ContainerListInit(ctx, cliPrm) + start := time.Now() + cnrRdr, err := cl.ContainerListInit(ctx, cliPrm) + c.incRequests(time.Since(start), methodContainerListStream) if err = c.handleError(ctx, nil, err); err != nil { return ResListStream{}, fmt.Errorf("init container listing on client: %w", err) } - return ResListStream{r: res, handleError: c.handleError}, nil + return ResListStream{ + r: cnrRdr, + elapsedTimeCallback: func(elapsed time.Duration) { + c.incRequests(elapsed, methodContainerListStream) + }, + handleError: c.handleError, + }, nil } // containerDelete invokes sdkClient.ContainerDelete parse response status to error. diff --git a/pool/statistic.go b/pool/statistic.go index 40da88ff..b9c24303 100644 --- a/pool/statistic.go +++ b/pool/statistic.go @@ -97,6 +97,11 @@ func (n NodeStatistic) AverageListContainer() time.Duration { return n.averageTime(methodContainerList) } +// AverageListContainerStream returns average time to perform ContainerListStream request. +func (n NodeStatistic) AverageListContainerStream() time.Duration { + return n.averageTime(methodContainerListStream) +} + // AverageDeleteContainer returns average time to perform ContainerDelete request. func (n NodeStatistic) AverageDeleteContainer() time.Duration { return n.averageTime(methodContainerDelete)