diff --git a/pool/pool.go b/pool/pool.go index f9e3dae..9798836 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -258,6 +258,8 @@ type wrapperPrm struct { responseInfoCallback func(sdkClient.ResponseMetaInfo) error poolRequestInfoCallback func(RequestInfo) dialOptions []grpc.DialOption + + gracefulCloseOnSwitchTimeout time.Duration } // setAddress sets endpoint to connect in FrostFS network. @@ -291,6 +293,14 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } +// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing +// if it will become healthy back. +// +// See also setErrorThreshold. +func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) { + x.gracefulCloseOnSwitchTimeout = timeout +} + // setPoolRequestCallback sets callback that will be invoked after every pool response. func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { x.poolRequestInfoCallback = f @@ -362,7 +372,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e // if connection is dialed before, to avoid routine / connection leak, // pool has to close it and then initialize once again. if c.isDialed() { - _ = c.close() + c.scheduleGracefulClose() } var cl sdkClient.Client @@ -407,6 +417,12 @@ func (c *clientWrapper) getClient() (*sdkClient.Client, error) { return nil, errPoolClientUnhealthy } +func (c *clientWrapper) getClientRaw() *sdkClient.Client { + c.clientMutex.RLock() + defer c.clientMutex.RUnlock() + return c.client +} + // balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { cl, err := c.getClient() @@ -1220,12 +1236,25 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { } func (c *clientWrapper) close() error { - if c.client != nil { - return c.client.Close() + if cl := c.getClientRaw(); cl != nil { + return cl.Close() } return nil } +func (c *clientWrapper) scheduleGracefulClose() { + cl := c.getClientRaw() + if cl == nil { + return + } + + time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() { + if err := cl.Close(); err != nil { + c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err)) + } + }) +} + func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { if stErr := apistatus.ErrFromStatus(st); stErr != nil { switch stErr.(type) { @@ -1300,6 +1329,8 @@ type InitParameters struct { dialOptions []grpc.DialOption clientBuilder clientBuilder + + gracefulCloseOnSwitchTimeout time.Duration } // SetKey specifies default key to be used for the protocol communication by default. @@ -1336,6 +1367,15 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { x.clientRebalanceInterval = interval } +// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing +// if it will become healthy back. +// Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval). +// +// See also SetErrorThreshold. +func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) { + x.gracefulCloseOnSwitchTimeout = timeout +} + // SetSessionExpirationDuration specifies the session token lifetime in epochs. func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) { x.sessionExpirationDuration = expirationDuration @@ -1984,10 +2024,11 @@ const ( defaultSessionTokenExpirationDuration = 100 // in epochs defaultErrorThreshold = 100 - defaultRebalanceInterval = 15 * time.Second - defaultHealthcheckTimeout = 4 * time.Second - defaultDialTimeout = 5 * time.Second - defaultStreamTimeout = 10 * time.Second + defaultGracefulCloseOnSwitchTimeout = 10 * time.Second + defaultRebalanceInterval = 15 * time.Second + defaultHealthcheckTimeout = 4 * time.Second + defaultDialTimeout = 5 * time.Second + defaultStreamTimeout = 10 * time.Second defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB ) @@ -2109,6 +2150,10 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { params.clientRebalanceInterval = defaultRebalanceInterval } + if params.gracefulCloseOnSwitchTimeout <= 0 { + params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout + } + if params.healthcheckTimeout <= 0 { params.healthcheckTimeout = defaultHealthcheckTimeout } @@ -2134,6 +2179,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { prm.setDialTimeout(params.nodeDialTimeout) prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold) + prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout) prm.setPoolRequestCallback(params.requestCallback) prm.setGRPCDialOptions(params.dialOptions) prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {