[#254] pool: Add parameter gracefulCloseOnSwitchTimeout #255

Merged
fyrchik merged 1 commit from dkirillov/frostfs-sdk-go:feature/254-pool_gracefull_client_switch into master 2024-08-22 08:02:52 +00:00

View file

@ -258,6 +258,8 @@ type wrapperPrm struct {
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
poolRequestInfoCallback func(RequestInfo)
dialOptions []grpc.DialOption
gracefulCloseOnSwitchTimeout time.Duration
}
// setAddress sets endpoint to connect in FrostFS network.
@ -291,6 +293,14 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
//
// See also setErrorThreshold.
func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// setPoolRequestCallback sets callback that will be invoked after every pool response.
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
x.poolRequestInfoCallback = f
@ -362,7 +372,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
c.scheduleGracefulClose()
}
var cl sdkClient.Client
@ -407,6 +417,12 @@ func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
return nil, errPoolClientUnhealthy
}
func (c *clientWrapper) getClientRaw() *sdkClient.Client {
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
return c.client
}
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cl, err := c.getClient()
@ -1220,12 +1236,25 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
}
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
if cl := c.getClientRaw(); cl != nil {
return cl.Close()
}
return nil
}
func (c *clientWrapper) scheduleGracefulClose() {
cl := c.getClientRaw()
if cl == nil {
return
}
time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() {
Review

Could you explain how this works?
So instead of closing immediately, we close after some time.
Why no new requests could arrive in the meantime?

Could you explain how this works? So instead of closing immediately, we close after some time. Why no new requests could arrive in the meantime?
Review

At this time current client marked as unhealhty, so new request don't use this connection. Also after invoking such delayed close we invoke new dial, so client will contain brand new connection (despite the old one hasn't yet been closed)

At this time current client marked as unhealhty, so new request don't use this connection. Also after invoking such delayed close we invoke new dial, so client will contain brand new connection (despite the old one hasn't yet been closed)
Review

Can this timeout affect switching logic?
e.g. we mark some endpoint as unhealthy, then switch back to it, but this switch timeout is lower than the graceful timeout here.

Can this timeout affect switching logic? e.g. we mark some endpoint as unhealthy, then switch back to it, but this switch timeout is lower than the graceful timeout here.
Review

This timeout isn't affect swtitching logic because when we switch back we invoke dial and get new connection.

This timeout isn't affect swtitching logic because when we switch back we invoke `dial` and get new connection.
if err := cl.Close(); err != nil {
c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err))
}
})
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if stErr := apistatus.ErrFromStatus(st); stErr != nil {
switch stErr.(type) {
@ -1300,6 +1329,8 @@ type InitParameters struct {
dialOptions []grpc.DialOption
clientBuilder clientBuilder
gracefulCloseOnSwitchTimeout time.Duration
}
// SetKey specifies default key to be used for the protocol communication by default.
@ -1336,6 +1367,15 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
// Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval).
//
// See also SetErrorThreshold.
func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
x.sessionExpirationDuration = expirationDuration
@ -1984,10 +2024,11 @@ const (
defaultSessionTokenExpirationDuration = 100 // in epochs
defaultErrorThreshold = 100
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultGracefulCloseOnSwitchTimeout = 10 * time.Second
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
@ -2109,6 +2150,10 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.gracefulCloseOnSwitchTimeout <= 0 {
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout
}
@ -2134,6 +2179,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold)
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
prm.setPoolRequestCallback(params.requestCallback)
prm.setGRPCDialOptions(params.dialOptions)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {