[#254] pool: Add parameter gracefulCloseOnSwitchTimeout

Add new param for waiting a little until current in-flight requests will be finished

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-08-19 17:54:42 +03:00 committed by Evgenii Stratonikov
parent 9115d3f281
commit 28f140bf06

View file

@ -258,6 +258,8 @@ type wrapperPrm struct {
responseInfoCallback func(sdkClient.ResponseMetaInfo) error responseInfoCallback func(sdkClient.ResponseMetaInfo) error
poolRequestInfoCallback func(RequestInfo) poolRequestInfoCallback func(RequestInfo)
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
gracefulCloseOnSwitchTimeout time.Duration
} }
// setAddress sets endpoint to connect in FrostFS network. // setAddress sets endpoint to connect in FrostFS network.
@ -291,6 +293,14 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold x.errorThreshold = threshold
} }
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
//
// See also setErrorThreshold.
func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// setPoolRequestCallback sets callback that will be invoked after every pool response. // setPoolRequestCallback sets callback that will be invoked after every pool response.
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
x.poolRequestInfoCallback = f x.poolRequestInfoCallback = f
@ -362,7 +372,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
// if connection is dialed before, to avoid routine / connection leak, // if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again. // pool has to close it and then initialize once again.
if c.isDialed() { if c.isDialed() {
_ = c.close() c.scheduleGracefulClose()
} }
var cl sdkClient.Client var cl sdkClient.Client
@ -407,6 +417,12 @@ func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
return nil, errPoolClientUnhealthy return nil, errPoolClientUnhealthy
} }
func (c *clientWrapper) getClientRaw() *sdkClient.Client {
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
return c.client
}
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. // balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cl, err := c.getClient() cl, err := c.getClient()
@ -1220,12 +1236,25 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
} }
func (c *clientWrapper) close() error { func (c *clientWrapper) close() error {
if c.client != nil { if cl := c.getClientRaw(); cl != nil {
return c.client.Close() return cl.Close()
} }
return nil return nil
} }
func (c *clientWrapper) scheduleGracefulClose() {
cl := c.getClientRaw()
if cl == nil {
return
}
time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() {
if err := cl.Close(); err != nil {
c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err))
}
})
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if stErr := apistatus.ErrFromStatus(st); stErr != nil { if stErr := apistatus.ErrFromStatus(st); stErr != nil {
switch stErr.(type) { switch stErr.(type) {
@ -1300,6 +1329,8 @@ type InitParameters struct {
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
clientBuilder clientBuilder clientBuilder clientBuilder
gracefulCloseOnSwitchTimeout time.Duration
} }
// SetKey specifies default key to be used for the protocol communication by default. // SetKey specifies default key to be used for the protocol communication by default.
@ -1336,6 +1367,15 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval x.clientRebalanceInterval = interval
} }
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
// Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval).
//
// See also SetErrorThreshold.
func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// SetSessionExpirationDuration specifies the session token lifetime in epochs. // SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) { func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
x.sessionExpirationDuration = expirationDuration x.sessionExpirationDuration = expirationDuration
@ -1984,10 +2024,11 @@ const (
defaultSessionTokenExpirationDuration = 100 // in epochs defaultSessionTokenExpirationDuration = 100 // in epochs
defaultErrorThreshold = 100 defaultErrorThreshold = 100
defaultRebalanceInterval = 15 * time.Second defaultGracefulCloseOnSwitchTimeout = 10 * time.Second
defaultHealthcheckTimeout = 4 * time.Second defaultRebalanceInterval = 15 * time.Second
defaultDialTimeout = 5 * time.Second defaultHealthcheckTimeout = 4 * time.Second
defaultStreamTimeout = 10 * time.Second defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
) )
@ -2109,6 +2150,10 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
params.clientRebalanceInterval = defaultRebalanceInterval params.clientRebalanceInterval = defaultRebalanceInterval
} }
if params.gracefulCloseOnSwitchTimeout <= 0 {
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
}
if params.healthcheckTimeout <= 0 { if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout params.healthcheckTimeout = defaultHealthcheckTimeout
} }
@ -2134,6 +2179,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
prm.setDialTimeout(params.nodeDialTimeout) prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout) prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold) prm.setErrorThreshold(params.errorThreshold)
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
prm.setPoolRequestCallback(params.requestCallback) prm.setPoolRequestCallback(params.requestCallback)
prm.setGRPCDialOptions(params.dialOptions) prm.setGRPCDialOptions(params.dialOptions)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {