From 8c0c7789cafe30c36eee3fde867bf6719c9aaf6b Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 29 Nov 2022 15:03:50 +0300 Subject: [PATCH] [#365] pool: Add request callback Signed-off-by: Denis Kirillov Signed-off-by: Evgenii Stratonikov --- pool/pool.go | 42 +++++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 50801a5..ddd2056 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -224,12 +224,13 @@ type clientWrapper struct { // wrapperPrm is params to create clientWrapper. type wrapperPrm struct { - address string - key ecdsa.PrivateKey - dialTimeout time.Duration - streamTimeout time.Duration - errorThreshold uint32 - responseInfoCallback func(sdkClient.ResponseMetaInfo) error + address string + key ecdsa.PrivateKey + dialTimeout time.Duration + streamTimeout time.Duration + errorThreshold uint32 + responseInfoCallback func(sdkClient.ResponseMetaInfo) error + poolRequestInfoCallback func(RequestInfo) } // setAddress sets endpoint to connect in NeoFS network. @@ -258,6 +259,11 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } +// setPoolRequestCallback sets callback that will be invoked after every pool response. +func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { + x.poolRequestInfoCallback = f +} + // setResponseInfoCallback sets callback that will be invoked after every response. func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { x.responseInfoCallback = f @@ -964,9 +970,16 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { return result } -func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method MethodIndex) { +func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { methodStat := c.methods[method] methodStat.incRequests(elapsed) + if c.prm.poolRequestInfoCallback != nil { + c.prm.poolRequestInfoCallback(RequestInfo{ + Address: c.prm.address, + Method: method, + Elapsed: elapsed, + }) + } } func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { @@ -991,6 +1004,13 @@ func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error // to the given endpoint. type clientBuilder = func(endpoint string) client +// RequestInfo groups info about pool request. +type RequestInfo struct { + Address string + Method MethodIndex + Elapsed time.Duration +} + // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *ecdsa.PrivateKey @@ -1002,6 +1022,7 @@ type InitParameters struct { sessionExpirationDuration uint64 errorThreshold uint32 nodeParams []NodeParam + requestCallback func(RequestInfo) clientBuilder clientBuilder } @@ -1050,6 +1071,12 @@ func (x *InitParameters) SetErrorThreshold(threshold uint32) { x.errorThreshold = threshold } +// SetRequestCallback makes the pool client to pass RequestInfo for each +// request to f. Nil (default) means ignore RequestInfo. +func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) { + x.requestCallback = f +} + // AddNode append information about the node to which you want to connect. func (x *InitParameters) AddNode(nodeParam NodeParam) { x.nodeParams = append(x.nodeParams, nodeParam) @@ -1651,6 +1678,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { prm.setDialTimeout(params.nodeDialTimeout) prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold) + prm.setPoolRequestCallback(params.requestCallback) prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { cache.updateEpoch(info.Epoch()) return nil