[#365] pool: Add request callback
Signed-off-by: Denis Kirillov <denis@nspcc.ru> Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
11e2f2b013
commit
864531a0e6
1 changed files with 35 additions and 7 deletions
42
pool/pool.go
42
pool/pool.go
|
@ -224,12 +224,13 @@ type clientWrapper struct {
|
||||||
|
|
||||||
// wrapperPrm is params to create clientWrapper.
|
// wrapperPrm is params to create clientWrapper.
|
||||||
type wrapperPrm struct {
|
type wrapperPrm struct {
|
||||||
address string
|
address string
|
||||||
key ecdsa.PrivateKey
|
key ecdsa.PrivateKey
|
||||||
dialTimeout time.Duration
|
dialTimeout time.Duration
|
||||||
streamTimeout time.Duration
|
streamTimeout time.Duration
|
||||||
errorThreshold uint32
|
errorThreshold uint32
|
||||||
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
|
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
|
||||||
|
poolRequestInfoCallback func(RequestInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// setAddress sets endpoint to connect in NeoFS network.
|
// setAddress sets endpoint to connect in NeoFS network.
|
||||||
|
@ -258,6 +259,11 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
||||||
x.errorThreshold = threshold
|
x.errorThreshold = threshold
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setPoolRequestCallback sets callback that will be invoked after every pool response.
|
||||||
|
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
|
||||||
|
x.poolRequestInfoCallback = f
|
||||||
|
}
|
||||||
|
|
||||||
// setResponseInfoCallback sets callback that will be invoked after every response.
|
// setResponseInfoCallback sets callback that will be invoked after every response.
|
||||||
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
|
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
|
||||||
x.responseInfoCallback = f
|
x.responseInfoCallback = f
|
||||||
|
@ -964,9 +970,16 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method MethodIndex) {
|
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||||
methodStat := c.methods[method]
|
methodStat := c.methods[method]
|
||||||
methodStat.incRequests(elapsed)
|
methodStat.incRequests(elapsed)
|
||||||
|
if c.prm.poolRequestInfoCallback != nil {
|
||||||
|
c.prm.poolRequestInfoCallback(RequestInfo{
|
||||||
|
Address: c.prm.address,
|
||||||
|
Method: method,
|
||||||
|
Elapsed: elapsed,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
|
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
|
||||||
|
@ -991,6 +1004,13 @@ func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error
|
||||||
// to the given endpoint.
|
// to the given endpoint.
|
||||||
type clientBuilder = func(endpoint string) client
|
type clientBuilder = func(endpoint string) client
|
||||||
|
|
||||||
|
// RequestInfo groups info about pool request.
|
||||||
|
type RequestInfo struct {
|
||||||
|
Address string
|
||||||
|
Method MethodIndex
|
||||||
|
Elapsed time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
// InitParameters contains values used to initialize connection Pool.
|
// InitParameters contains values used to initialize connection Pool.
|
||||||
type InitParameters struct {
|
type InitParameters struct {
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
|
@ -1002,6 +1022,7 @@ type InitParameters struct {
|
||||||
sessionExpirationDuration uint64
|
sessionExpirationDuration uint64
|
||||||
errorThreshold uint32
|
errorThreshold uint32
|
||||||
nodeParams []NodeParam
|
nodeParams []NodeParam
|
||||||
|
requestCallback func(RequestInfo)
|
||||||
|
|
||||||
clientBuilder clientBuilder
|
clientBuilder clientBuilder
|
||||||
}
|
}
|
||||||
|
@ -1050,6 +1071,12 @@ func (x *InitParameters) SetErrorThreshold(threshold uint32) {
|
||||||
x.errorThreshold = threshold
|
x.errorThreshold = threshold
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetRequestCallback makes the pool client to pass RequestInfo for each
|
||||||
|
// request to f. Nil (default) means ignore RequestInfo.
|
||||||
|
func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) {
|
||||||
|
x.requestCallback = f
|
||||||
|
}
|
||||||
|
|
||||||
// AddNode append information about the node to which you want to connect.
|
// AddNode append information about the node to which you want to connect.
|
||||||
func (x *InitParameters) AddNode(nodeParam NodeParam) {
|
func (x *InitParameters) AddNode(nodeParam NodeParam) {
|
||||||
x.nodeParams = append(x.nodeParams, nodeParam)
|
x.nodeParams = append(x.nodeParams, nodeParam)
|
||||||
|
@ -1651,6 +1678,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||||
prm.setDialTimeout(params.nodeDialTimeout)
|
prm.setDialTimeout(params.nodeDialTimeout)
|
||||||
prm.setStreamTimeout(params.nodeStreamTimeout)
|
prm.setStreamTimeout(params.nodeStreamTimeout)
|
||||||
prm.setErrorThreshold(params.errorThreshold)
|
prm.setErrorThreshold(params.errorThreshold)
|
||||||
|
prm.setPoolRequestCallback(params.requestCallback)
|
||||||
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
||||||
cache.updateEpoch(info.Epoch())
|
cache.updateEpoch(info.Epoch())
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue