[#364] pool: Adopt client stream timeout param

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-11-15 14:32:25 +03:00 committed by Alex Vanin
parent d047289182
commit b4b07a3c4e

View file

@ -226,7 +226,8 @@ type clientWrapper struct {
type wrapperPrm struct {
address string
key ecdsa.PrivateKey
timeout time.Duration
dialTimeout time.Duration
streamTimeout time.Duration
errorThreshold uint32
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
}
@ -241,9 +242,14 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key
}
// setTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setTimeout(timeout time.Duration) {
x.timeout = timeout
// setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout
}
// setStreamTimeout sets the timeout for individual operations in streaming RPC.
func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
x.streamTimeout = timeout
}
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
@ -286,7 +292,8 @@ func (c *clientWrapper) dial(ctx context.Context) error {
var prmDial sdkClient.PrmDial
prmDial.SetServerURI(c.prm.address)
prmDial.SetTimeout(c.prm.timeout)
prmDial.SetTimeout(c.prm.dialTimeout)
prmDial.SetStreamTimeout(c.prm.streamTimeout)
prmDial.SetContext(ctx)
if err = cl.Dial(prmDial); err != nil {
@ -316,7 +323,8 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
var prmDial sdkClient.PrmDial
prmDial.SetServerURI(c.prm.address)
prmDial.SetTimeout(c.prm.timeout)
prmDial.SetTimeout(c.prm.dialTimeout)
prmDial.SetStreamTimeout(c.prm.streamTimeout)
prmDial.SetContext(ctx)
if err := cl.Dial(prmDial); err != nil {
@ -988,6 +996,7 @@ type InitParameters struct {
key *ecdsa.PrivateKey
logger *zap.Logger
nodeDialTimeout time.Duration
nodeStreamTimeout time.Duration
healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration
sessionExpirationDuration uint64
@ -1012,6 +1021,11 @@ func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
x.nodeDialTimeout = timeout
}
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
x.nodeStreamTimeout = timeout
}
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
@ -1505,7 +1519,9 @@ const (
defaultErrorThreshold = 100
defaultRebalanceInterval = 25 * time.Second
defaultRequestTimeout = 4 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
)
// NewPool creates connection pool using parameters.
@ -1616,7 +1632,15 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultRequestTimeout
params.healthcheckTimeout = defaultHealthcheckTimeout
}
if params.nodeDialTimeout <= 0 {
params.nodeDialTimeout = defaultDialTimeout
}
if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout
}
if params.isMissingClientBuilder() {
@ -1624,7 +1648,8 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm
prm.setAddress(addr)
prm.setKey(*params.key)
prm.setTimeout(params.nodeDialTimeout)
prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
cache.updateEpoch(info.Epoch())