[#254] pool: Add parameter gracefulCloseOnSwitchTimeout #255
1 changed files with 53 additions and 7 deletions
60
pool/pool.go
60
pool/pool.go
|
@ -258,6 +258,8 @@ type wrapperPrm struct {
|
|||
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
|
||||
poolRequestInfoCallback func(RequestInfo)
|
||||
dialOptions []grpc.DialOption
|
||||
|
||||
gracefulCloseOnSwitchTimeout time.Duration
|
||||
}
|
||||
|
||||
// setAddress sets endpoint to connect in FrostFS network.
|
||||
|
@ -291,6 +293,14 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
|||
x.errorThreshold = threshold
|
||||
}
|
||||
|
||||
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
|
||||
// if it will become healthy back.
|
||||
//
|
||||
// See also setErrorThreshold.
|
||||
func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) {
|
||||
x.gracefulCloseOnSwitchTimeout = timeout
|
||||
}
|
||||
|
||||
// setPoolRequestCallback sets callback that will be invoked after every pool response.
|
||||
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
|
||||
x.poolRequestInfoCallback = f
|
||||
|
@ -362,7 +372,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
|
|||
// if connection is dialed before, to avoid routine / connection leak,
|
||||
// pool has to close it and then initialize once again.
|
||||
if c.isDialed() {
|
||||
_ = c.close()
|
||||
c.scheduleGracefulClose()
|
||||
}
|
||||
|
||||
var cl sdkClient.Client
|
||||
|
@ -407,6 +417,12 @@ func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
|
|||
return nil, errPoolClientUnhealthy
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getClientRaw() *sdkClient.Client {
|
||||
c.clientMutex.RLock()
|
||||
defer c.clientMutex.RUnlock()
|
||||
return c.client
|
||||
}
|
||||
|
||||
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
|
||||
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
||||
cl, err := c.getClient()
|
||||
|
@ -1220,12 +1236,25 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
|||
}
|
||||
|
||||
func (c *clientWrapper) close() error {
|
||||
if c.client != nil {
|
||||
return c.client.Close()
|
||||
if cl := c.getClientRaw(); cl != nil {
|
||||
return cl.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) scheduleGracefulClose() {
|
||||
cl := c.getClientRaw()
|
||||
if cl == nil {
|
||||
return
|
||||
}
|
||||
|
||||
time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() {
|
||||
|
||||
if err := cl.Close(); err != nil {
|
||||
c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
|
||||
if stErr := apistatus.ErrFromStatus(st); stErr != nil {
|
||||
switch stErr.(type) {
|
||||
|
@ -1300,6 +1329,8 @@ type InitParameters struct {
|
|||
dialOptions []grpc.DialOption
|
||||
|
||||
clientBuilder clientBuilder
|
||||
|
||||
gracefulCloseOnSwitchTimeout time.Duration
|
||||
}
|
||||
|
||||
// SetKey specifies default key to be used for the protocol communication by default.
|
||||
|
@ -1336,6 +1367,15 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
|||
x.clientRebalanceInterval = interval
|
||||
}
|
||||
|
||||
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
|
||||
// if it will become healthy back.
|
||||
// Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval).
|
||||
//
|
||||
// See also SetErrorThreshold.
|
||||
func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) {
|
||||
x.gracefulCloseOnSwitchTimeout = timeout
|
||||
}
|
||||
|
||||
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
|
||||
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
|
||||
x.sessionExpirationDuration = expirationDuration
|
||||
|
@ -1984,10 +2024,11 @@ const (
|
|||
defaultSessionTokenExpirationDuration = 100 // in epochs
|
||||
defaultErrorThreshold = 100
|
||||
|
||||
defaultRebalanceInterval = 15 * time.Second
|
||||
defaultHealthcheckTimeout = 4 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
defaultGracefulCloseOnSwitchTimeout = 10 * time.Second
|
||||
defaultRebalanceInterval = 15 * time.Second
|
||||
defaultHealthcheckTimeout = 4 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
|
||||
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
||||
)
|
||||
|
@ -2109,6 +2150,10 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
|||
params.clientRebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
|
||||
if params.gracefulCloseOnSwitchTimeout <= 0 {
|
||||
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
|
||||
}
|
||||
|
||||
if params.healthcheckTimeout <= 0 {
|
||||
params.healthcheckTimeout = defaultHealthcheckTimeout
|
||||
}
|
||||
|
@ -2134,6 +2179,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
|||
prm.setDialTimeout(params.nodeDialTimeout)
|
||||
prm.setStreamTimeout(params.nodeStreamTimeout)
|
||||
prm.setErrorThreshold(params.errorThreshold)
|
||||
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
|
||||
prm.setPoolRequestCallback(params.requestCallback)
|
||||
prm.setGRPCDialOptions(params.dialOptions)
|
||||
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
||||
|
|
Loading…
Reference in a new issue
Could you explain how this works?
So instead of closing immediately, we close after some time.
Why no new requests could arrive in the meantime?
At this time current client marked as unhealhty, so new request don't use this connection. Also after invoking such delayed close we invoke new dial, so client will contain brand new connection (despite the old one hasn't yet been closed)
Can this timeout affect switching logic?
e.g. we mark some endpoint as unhealthy, then switch back to it, but this switch timeout is lower than the graceful timeout here.
This timeout isn't affect swtitching logic because when we switch back we invoke
dial
and get new connection.