diff --git a/pool/pool.go b/pool/pool.go index 1cfdc10..13d09eb 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -226,7 +226,8 @@ type clientWrapper struct { type wrapperPrm struct { address string key ecdsa.PrivateKey - timeout time.Duration + dialTimeout time.Duration + streamTimeout time.Duration errorThreshold uint32 responseInfoCallback func(sdkClient.ResponseMetaInfo) error } @@ -241,9 +242,14 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { x.key = key } -// setTimeout sets the timeout for connection to be established. -func (x *wrapperPrm) setTimeout(timeout time.Duration) { - x.timeout = timeout +// setDialTimeout sets the timeout for connection to be established. +func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { + x.dialTimeout = timeout +} + +// setStreamTimeout sets the timeout for individual operations in streaming RPC. +func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) { + x.streamTimeout = timeout } // setErrorThreshold sets threshold after reaching which connection is considered unhealthy @@ -286,7 +292,8 @@ func (c *clientWrapper) dial(ctx context.Context) error { var prmDial sdkClient.PrmDial prmDial.SetServerURI(c.prm.address) - prmDial.SetTimeout(c.prm.timeout) + prmDial.SetTimeout(c.prm.dialTimeout) + prmDial.SetStreamTimeout(c.prm.streamTimeout) prmDial.SetContext(ctx) if err = cl.Dial(prmDial); err != nil { @@ -316,7 +323,8 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change var prmDial sdkClient.PrmDial prmDial.SetServerURI(c.prm.address) - prmDial.SetTimeout(c.prm.timeout) + prmDial.SetTimeout(c.prm.dialTimeout) + prmDial.SetStreamTimeout(c.prm.streamTimeout) prmDial.SetContext(ctx) if err := cl.Dial(prmDial); err != nil { @@ -988,6 +996,7 @@ type InitParameters struct { key *ecdsa.PrivateKey logger *zap.Logger nodeDialTimeout time.Duration + nodeStreamTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration sessionExpirationDuration uint64 @@ -1012,6 +1021,11 @@ func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { x.nodeDialTimeout = timeout } +// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC. +func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) { + x.nodeStreamTimeout = timeout +} + // SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. // // See also Pool.Dial. @@ -1504,8 +1518,10 @@ const ( defaultSessionTokenExpirationDuration = 100 // in blocks defaultErrorThreshold = 100 - defaultRebalanceInterval = 25 * time.Second - defaultRequestTimeout = 4 * time.Second + defaultRebalanceInterval = 25 * time.Second + defaultHealthcheckTimeout = 4 * time.Second + defaultDialTimeout = 5 * time.Second + defaultStreamTimeout = 10 * time.Second ) // NewPool creates connection pool using parameters. @@ -1616,7 +1632,15 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { } if params.healthcheckTimeout <= 0 { - params.healthcheckTimeout = defaultRequestTimeout + params.healthcheckTimeout = defaultHealthcheckTimeout + } + + if params.nodeDialTimeout <= 0 { + params.nodeDialTimeout = defaultDialTimeout + } + + if params.nodeStreamTimeout <= 0 { + params.nodeStreamTimeout = defaultStreamTimeout } if params.isMissingClientBuilder() { @@ -1624,7 +1648,8 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { var prm wrapperPrm prm.setAddress(addr) prm.setKey(*params.key) - prm.setTimeout(params.nodeDialTimeout) + prm.setDialTimeout(params.nodeDialTimeout) + prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold) prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { cache.updateEpoch(info.Epoch())