[#365] pool: Add request callback

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Denis Kirillov 2022-11-29 15:03:50 +03:00 committed by fyrchik
parent 339e2702f8
commit 8c0c7789ca

View file

@ -230,6 +230,7 @@ type wrapperPrm struct {
streamTimeout time.Duration streamTimeout time.Duration
errorThreshold uint32 errorThreshold uint32
responseInfoCallback func(sdkClient.ResponseMetaInfo) error responseInfoCallback func(sdkClient.ResponseMetaInfo) error
poolRequestInfoCallback func(RequestInfo)
} }
// setAddress sets endpoint to connect in NeoFS network. // setAddress sets endpoint to connect in NeoFS network.
@ -258,6 +259,11 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold x.errorThreshold = threshold
} }
// setPoolRequestCallback sets callback that will be invoked after every pool response.
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
x.poolRequestInfoCallback = f
}
// setResponseInfoCallback sets callback that will be invoked after every response. // setResponseInfoCallback sets callback that will be invoked after every response.
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
x.responseInfoCallback = f x.responseInfoCallback = f
@ -964,9 +970,16 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
return result return result
} }
func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method MethodIndex) { func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
methodStat := c.methods[method] methodStat := c.methods[method]
methodStat.incRequests(elapsed) methodStat.incRequests(elapsed)
if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address,
Method: method,
Elapsed: elapsed,
})
}
} }
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
@ -991,6 +1004,13 @@ func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error
// to the given endpoint. // to the given endpoint.
type clientBuilder = func(endpoint string) client type clientBuilder = func(endpoint string) client
// RequestInfo groups info about pool request.
type RequestInfo struct {
Address string
Method MethodIndex
Elapsed time.Duration
}
// InitParameters contains values used to initialize connection Pool. // InitParameters contains values used to initialize connection Pool.
type InitParameters struct { type InitParameters struct {
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
@ -1002,6 +1022,7 @@ type InitParameters struct {
sessionExpirationDuration uint64 sessionExpirationDuration uint64
errorThreshold uint32 errorThreshold uint32
nodeParams []NodeParam nodeParams []NodeParam
requestCallback func(RequestInfo)
clientBuilder clientBuilder clientBuilder clientBuilder
} }
@ -1050,6 +1071,12 @@ func (x *InitParameters) SetErrorThreshold(threshold uint32) {
x.errorThreshold = threshold x.errorThreshold = threshold
} }
// SetRequestCallback makes the pool client to pass RequestInfo for each
// request to f. Nil (default) means ignore RequestInfo.
func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) {
x.requestCallback = f
}
// AddNode append information about the node to which you want to connect. // AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam NodeParam) { func (x *InitParameters) AddNode(nodeParam NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam) x.nodeParams = append(x.nodeParams, nodeParam)
@ -1651,6 +1678,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
prm.setDialTimeout(params.nodeDialTimeout) prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout) prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold) prm.setErrorThreshold(params.errorThreshold)
prm.setPoolRequestCallback(params.requestCallback)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
cache.updateEpoch(info.Epoch()) cache.updateEpoch(info.Epoch())
return nil return nil