[#398] Support retryer #398
12 changed files with 288 additions and 1 deletions
|
@ -47,6 +47,9 @@ type (
|
|||
BypassContentEncodingInChunks() bool
|
||||
MD5Enabled() bool
|
||||
ACLEnabled() bool
|
||||
RetryMaxAttempts() int
|
||||
RetryMaxBackoff() time.Duration
|
||||
RetryStrategy() RetryStrategy
|
||||
}
|
||||
|
||||
FrostFSID interface {
|
||||
|
@ -63,6 +66,13 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
type RetryStrategy string
|
||||
|
||||
const (
|
||||
RetryStrategyExponential = "exponential"
|
||||
RetryStrategyConstant = "constant"
|
||||
)
|
||||
|
||||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
// New creates new api.Handler using given logger and client.
|
||||
|
|
|
@ -132,6 +132,18 @@ func (c *configMock) ResolveNamespaceAlias(ns string) string {
|
|||
return ns
|
||||
}
|
||||
|
||||
func (c *configMock) RetryMaxAttempts() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (c *configMock) RetryMaxBackoff() time.Duration {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *configMock) RetryStrategy() RetryStrategy {
|
||||
return RetryStrategyConstant
|
||||
}
|
||||
|
||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
||||
}
|
||||
|
|
|
@ -26,12 +26,16 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/retryer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/schema/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/retry"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -922,7 +926,10 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
|
|||
sp.Settings.Versioning = data.VersioningEnabled
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketSettings(ctx, sp); err != nil {
|
||||
err = retryer.MakeWithRetry(ctx, func() error {
|
||||
return h.obj.PutBucketSettings(ctx, sp)
|
||||
}, h.putBucketSettingsRetryer())
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't save bucket settings", reqInfo, err,
|
||||
zap.String("container_id", bktInfo.CID.EncodeToString()))
|
||||
return
|
||||
|
@ -934,6 +941,27 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
|
|||
}
|
||||
}
|
||||
|
||||
func (h *handler) putBucketSettingsRetryer() aws.RetryerV2 {
|
||||
return retry.NewStandard(func(options *retry.StandardOptions) {
|
||||
options.MaxAttempts = h.cfg.RetryMaxAttempts()
|
||||
options.MaxBackoff = h.cfg.RetryMaxBackoff()
|
||||
if h.cfg.RetryStrategy() == RetryStrategyConstant {
|
||||
options.Backoff = retry.NewExponentialJitterBackoff(options.MaxBackoff)
|
||||
} else {
|
||||
options.Backoff = retry.BackoffDelayerFunc(func(int, error) (time.Duration, error) {
|
||||
return options.MaxBackoff, nil
|
||||
})
|
||||
}
|
||||
|
||||
options.Retryables = []retry.IsErrorRetryable{retry.IsErrorRetryableFunc(func(err error) aws.Ternary {
|
||||
if stderrors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
return aws.TrueTernary
|
||||
}
|
||||
return aws.FalseTernary
|
||||
})}
|
||||
})
|
||||
}
|
||||
|
||||
func (h *handler) createBucketHandlerACL(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
|
|
|
@ -111,6 +111,9 @@ type (
|
|||
authorizedControlAPIKeys [][]byte
|
||||
policyDenyByDefault bool
|
||||
sourceIPHeader string
|
||||
retryMaxAttempts int
|
||||
retryMaxBackoff time.Duration
|
||||
retryStrategy handler.RetryStrategy
|
||||
}
|
||||
|
||||
maxClientsConfig struct {
|
||||
|
@ -238,6 +241,9 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger, key *keys.PrivateK
|
|||
s.setAuthorizedControlAPIKeys(append(fetchAuthorizedKeys(log, v), key.PublicKey()))
|
||||
s.setPolicyDenyByDefault(v.GetBool(cfgPolicyDenyByDefault))
|
||||
s.setSourceIPHeader(v.GetString(cfgSourceIPHeader))
|
||||
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
|
||||
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
|
||||
s.setRetryStrategy(fetchRetryStrategy(v))
|
||||
}
|
||||
|
||||
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
|
||||
|
@ -451,6 +457,42 @@ func (s *appSettings) SourceIPHeader() string {
|
|||
return s.sourceIPHeader
|
||||
}
|
||||
|
||||
func (s *appSettings) setRetryMaxAttempts(maxAttempts int) {
|
||||
s.mu.Lock()
|
||||
s.retryMaxAttempts = maxAttempts
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) RetryMaxAttempts() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.retryMaxAttempts
|
||||
}
|
||||
|
||||
func (s *appSettings) setRetryMaxBackoff(maxBackoff time.Duration) {
|
||||
s.mu.Lock()
|
||||
s.retryMaxBackoff = maxBackoff
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) RetryMaxBackoff() time.Duration {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.retryMaxBackoff
|
||||
}
|
||||
|
||||
func (s *appSettings) setRetryStrategy(strategy handler.RetryStrategy) {
|
||||
s.mu.Lock()
|
||||
s.retryStrategy = strategy
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.retryStrategy
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
a.initHandler()
|
||||
|
|
|
@ -61,6 +61,10 @@ const (
|
|||
defaultNamespace = ""
|
||||
|
||||
defaultReconnectInterval = time.Minute
|
||||
|
||||
defaultRetryMaxAttempts = 4
|
||||
defaultRetryMaxBackoff = 30 * time.Second
|
||||
defaultRetryStrategy = handler.RetryStrategyExponential
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -178,6 +182,11 @@ const ( // Settings.
|
|||
cfgWebWriteTimeout = "web.write_timeout"
|
||||
cfgWebIdleTimeout = "web.idle_timeout"
|
||||
|
||||
// Retry.
|
||||
cfgRetryMaxAttempts = "retry.max_attempts"
|
||||
cfgRetryMaxBackoff = "retry.max_backoff"
|
||||
cfgRetryStrategy = "retry.strategy"
|
||||
|
||||
// Namespaces.
|
||||
cfgNamespacesConfig = "namespaces.config"
|
||||
|
||||
|
@ -325,6 +334,33 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|||
return int64(softMemoryLimit)
|
||||
}
|
||||
|
||||
func fetchRetryMaxAttempts(cfg *viper.Viper) int {
|
||||
val := cfg.GetInt(cfgRetryMaxAttempts)
|
||||
if val <= 0 {
|
||||
val = defaultRetryMaxAttempts
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchRetryMaxBackoff(cfg *viper.Viper) time.Duration {
|
||||
val := cfg.GetDuration(cfgRetryMaxBackoff)
|
||||
if val <= 0 {
|
||||
val = defaultRetryMaxBackoff
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchRetryStrategy(cfg *viper.Viper) handler.RetryStrategy {
|
||||
val := handler.RetryStrategy(cfg.GetString(cfgRetryStrategy))
|
||||
if val != handler.RetryStrategyExponential && val != handler.RetryStrategyConstant {
|
||||
val = defaultRetryStrategy
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy {
|
||||
var policy netmap.PlacementPolicy
|
||||
|
||||
|
@ -759,6 +795,10 @@ func newSettings() *viper.Viper {
|
|||
// resolve
|
||||
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||
|
||||
// retry
|
||||
v.SetDefault(cfgRetryMaxAttempts, defaultRetryMaxAttempts)
|
||||
v.SetDefault(cfgRetryMaxBackoff, defaultRetryMaxBackoff)
|
||||
|
||||
// Bind flags
|
||||
if err := bindFlags(v, flags); err != nil {
|
||||
panic(fmt.Errorf("bind flags: %w", err))
|
||||
|
|
|
@ -223,3 +223,12 @@ S3_GW_NAMESPACES_CONFIG=namespaces.json
|
|||
|
||||
# Custom header to retrieve Source IP
|
||||
S3_GW_SOURCE_IP_HEADER=Source-Ip
|
||||
|
||||
# Retry strategy configuration.
|
||||
# Max amount of request attempts. Currently only for updating bucket settings request.
|
||||
S3_GW_RETRY_MAX_ATTEMPTS=4
|
||||
# Max delay before next attempt.
|
||||
S3_GW_RETRY_MAX_BACKOFF=30s
|
||||
# Backoff strategy. `exponential` and `constant` are allowed.
|
||||
S3_GW_RETRY_STRATEGY=exponential
|
||||
|
||||
|
|
|
@ -262,3 +262,12 @@ namespaces:
|
|||
|
||||
# Custom header to retrieve Source IP
|
||||
source_ip_header: "Source-Ip"
|
||||
|
||||
# Retry strategy configuration.
|
||||
retry:
|
||||
# Max amount of request attempts. Currently only for updating bucket settings request.
|
||||
max_attempts: 4
|
||||
# Max delay before next attempt.
|
||||
max_backoff: 30s
|
||||
# Backoff strategy. `exponential` and `constant` are allowed.
|
||||
strategy: exponential
|
||||
|
|
|
@ -193,6 +193,7 @@ There are some custom types used for brevity:
|
|||
| `policy` | [Policy contract configuration](#policy-section) |
|
||||
| `proxy` | [Proxy contract configuration](#proxy-section) |
|
||||
| `namespaces` | [Namespaces configuration](#namespaces-section) |
|
||||
| `retry` | [Retry configuration](#retry-section) |
|
||||
|
||||
### General section
|
||||
|
||||
|
@ -741,3 +742,21 @@ To override config values for default namespaces use namespace names that are pr
|
|||
}
|
||||
}
|
||||
```
|
||||
|
||||
# `retry` section
|
||||
|
||||
Retry strategy configuration.
|
||||
|
||||
```yaml
|
||||
retry:
|
||||
max_attempts: 4
|
||||
max_backoff: 30s
|
||||
strategy: exponential
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------|------------|---------------|---------------|--------------------------------------------------------------------------------------|
|
||||
| `max_attemps` | `int` | yes | `4` | Max amount of request attempts. Currently only for updating bucket settings request. |
|
||||
| `max_backoff` | `duration` | yes | `30s` | Max delay before next attempt. |
|
||||
| `strategy` | `string` | yes | `exponential` | Backoff strategy. `exponential` and `constant` are allowed. |
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -10,6 +10,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240527065402-303a81cdc6db
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/aws/aws-sdk-go v1.44.6
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/go-chi/chi/v5 v5.0.8
|
||||
github.com/google/uuid v1.3.1
|
||||
|
@ -42,6 +43,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/aws/smithy-go v1.13.5 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
|
5
go.sum
5
go.sum
|
@ -64,6 +64,10 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8
|
|||
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
|
||||
github.com/aws/aws-sdk-go v1.44.6 h1:Y+uHxmZfhRTLX2X3khkdxCoTZAyGEX21aOUHe1U6geg=
|
||||
github.com/aws/aws-sdk-go v1.44.6/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
|
||||
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
|
||||
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5MS5JVb4c=
|
||||
|
@ -168,6 +172,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
|
|
52
pkg/retryer/retryer.go
Normal file
52
pkg/retryer/retryer.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package retryer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
)
|
||||
|
||||
func MakeWithRetry(ctx context.Context, fn func() error, retryer aws.RetryerV2) (err error) {
|
||||
var attemptNum int
|
||||
|
||||
maxAttempts := retryer.MaxAttempts()
|
||||
|
||||
for {
|
||||
attemptNum++
|
||||
|
||||
err = fn()
|
||||
|
||||
if !retryer.IsErrorRetryable(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if attemptNum >= maxAttempts {
|
||||
return fmt.Errorf("max retry attempts exausted, max %d: %w", maxAttempts, err)
|
||||
}
|
||||
|
||||
retryDelay, reqErr := retryer.RetryDelay(attemptNum, err)
|
||||
if reqErr != nil {
|
||||
return reqErr
|
||||
}
|
||||
|
||||
if reqErr = sleepWithContext(ctx, retryDelay); reqErr != nil {
|
||||
return reqErr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sleepWithContext(ctx context.Context, dur time.Duration) error {
|
||||
t := time.NewTimer(dur)
|
||||
defer t.Stop()
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
59
pkg/retryer/retryer_test.go
Normal file
59
pkg/retryer/retryer_test.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package retryer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRetryer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
retryer := retry.NewStandard(func(options *retry.StandardOptions) {
|
||||
options.MaxAttempts = 3
|
||||
options.Backoff = retry.BackoffDelayerFunc(func(int, error) (time.Duration, error) { return 0, nil })
|
||||
options.Retryables = []retry.IsErrorRetryable{retry.IsErrorRetryableFunc(func(err error) aws.Ternary {
|
||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
return aws.TrueTernary
|
||||
}
|
||||
return aws.FalseTernary
|
||||
})}
|
||||
})
|
||||
|
||||
t.Run("no retryable", func(t *testing.T) {
|
||||
count := 0
|
||||
err := MakeWithRetry(ctx, func() error {
|
||||
count++
|
||||
if count == 1 {
|
||||
return tree.ErrNodeNotFound
|
||||
}
|
||||
return tree.ErrNodeAccessDenied
|
||||
}, retryer)
|
||||
require.ErrorIs(t, err, tree.ErrNodeNotFound)
|
||||
})
|
||||
|
||||
t.Run("retry", func(t *testing.T) {
|
||||
count := 0
|
||||
err := MakeWithRetry(ctx, func() error {
|
||||
count++
|
||||
if count < 3 {
|
||||
return tree.ErrNodeAccessDenied
|
||||
}
|
||||
return nil
|
||||
}, retryer)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("retry exhausted", func(t *testing.T) {
|
||||
err := MakeWithRetry(ctx, func() error {
|
||||
return tree.ErrNodeAccessDenied
|
||||
}, retryer)
|
||||
require.ErrorIs(t, err, tree.ErrNodeAccessDenied)
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue