[#254] pool: Add parameter gracefulCloseOnSwitchTimeout
All checks were successful
DCO / DCO (pull_request) Successful in 37s
Tests and linters / Tests (1.21) (pull_request) Successful in 53s
Tests and linters / Tests (1.22) (pull_request) Successful in 54s
Tests and linters / Lint (pull_request) Successful in 1m38s

Add new param for waiting a little until current in-flight requests will be finished

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-08-19 17:54:42 +03:00
parent 203bba65a0
commit c416a37ce1

View file

@ -258,6 +258,8 @@ type wrapperPrm struct {
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
poolRequestInfoCallback func(RequestInfo)
dialOptions []grpc.DialOption
gracefulCloseOnSwitchTimeout time.Duration
}
// setAddress sets endpoint to connect in FrostFS network.
@ -291,6 +293,14 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
//
// See also setErrorThreshold.
func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// setPoolRequestCallback sets callback that will be invoked after every pool response.
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
x.poolRequestInfoCallback = f
@ -362,7 +372,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
c.scheduleGracefulClose()
}
var cl sdkClient.Client
@ -407,6 +417,12 @@ func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
return nil, errPoolClientUnhealthy
}
func (c *clientWrapper) getClientRaw() *sdkClient.Client {
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
return c.client
}
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cl, err := c.getClient()
@ -1220,12 +1236,25 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
}
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
if cl := c.getClientRaw(); cl != nil {
return cl.Close()
}
return nil
}
func (c *clientWrapper) scheduleGracefulClose() {
cl := c.getClientRaw()
if cl == nil {
return
}
time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() {
if err := cl.Close(); err != nil {
c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err))
}
})
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if stErr := apistatus.ErrFromStatus(st); stErr != nil {
switch stErr.(type) {
@ -1300,6 +1329,8 @@ type InitParameters struct {
dialOptions []grpc.DialOption
clientBuilder clientBuilder
gracefulCloseOnSwitchTimeout time.Duration
}
// SetKey specifies default key to be used for the protocol communication by default.
@ -1336,6 +1367,15 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
// Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval).
//
// See also SetErrorThreshold.
func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
x.sessionExpirationDuration = expirationDuration
@ -1984,10 +2024,11 @@ const (
defaultSessionTokenExpirationDuration = 100 // in epochs
defaultErrorThreshold = 100
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultGracefulCloseOnSwitchTimeout = 10 * time.Second
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
@ -2109,6 +2150,10 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.gracefulCloseOnSwitchTimeout <= 0 {
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout
}
@ -2134,6 +2179,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold)
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
prm.setPoolRequestCallback(params.requestCallback)
prm.setGRPCDialOptions(params.dialOptions)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {