[#398] Support retryer #398
12 changed files with 288 additions and 1 deletions
|
@ -47,6 +47,9 @@ type (
|
|||
BypassContentEncodingInChunks() bool
|
||||
MD5Enabled() bool
|
||||
ACLEnabled() bool
|
||||
RetryMaxAttempts() int
|
||||
RetryMaxBackoff() time.Duration
|
||||
RetryStrategy() RetryStrategy
|
||||
alexvanin marked this conversation as resolved
Outdated
|
||||
}
|
||||
|
||||
FrostFSID interface {
|
||||
|
@ -63,6 +66,13 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
type RetryStrategy string
|
||||
|
||||
const (
|
||||
RetryStrategyExponential = "exponential"
|
||||
RetryStrategyConstant = "constant"
|
||||
)
|
||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
Matter of taste, but do you consider keeping this in Matter of taste, but do you consider keeping this in `pkg/retryer`?
dkirillov
commented
Hm... Is it ok that these constant won't be used in package when they are located? If it's ok then I'll move them Hm... Is it ok that these constant won't be used in package when they are located? If it's ok then I'll move them
alexvanin
commented
Agree, this is application level constants, there is no need to define it in package. Agree, this is application level constants, there is no need to define it in package.
|
||||
|
||||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
// New creates new api.Handler using given logger and client.
|
||||
|
|
|
@ -132,6 +132,18 @@ func (c *configMock) ResolveNamespaceAlias(ns string) string {
|
|||
return ns
|
||||
}
|
||||
|
||||
func (c *configMock) RetryMaxAttempts() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (c *configMock) RetryMaxBackoff() time.Duration {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *configMock) RetryStrategy() RetryStrategy {
|
||||
return RetryStrategyConstant
|
||||
}
|
||||
|
||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
||||
}
|
||||
|
|
|
@ -26,12 +26,16 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/retryer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/schema/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/retry"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -922,7 +926,10 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
|
|||
sp.Settings.Versioning = data.VersioningEnabled
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketSettings(ctx, sp); err != nil {
|
||||
err = retryer.MakeWithRetry(ctx, func() error {
|
||||
return h.obj.PutBucketSettings(ctx, sp)
|
||||
}, h.putBucketSettingsRetryer())
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't save bucket settings", reqInfo, err,
|
||||
zap.String("container_id", bktInfo.CID.EncodeToString()))
|
||||
return
|
||||
|
@ -934,6 +941,27 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
|
|||
}
|
||||
}
|
||||
|
||||
func (h *handler) putBucketSettingsRetryer() aws.RetryerV2 {
|
||||
return retry.NewStandard(func(options *retry.StandardOptions) {
|
||||
options.MaxAttempts = h.cfg.RetryMaxAttempts()
|
||||
options.MaxBackoff = h.cfg.RetryMaxBackoff()
|
||||
if h.cfg.RetryStrategy() == RetryStrategyConstant {
|
||||
options.Backoff = retry.NewExponentialJitterBackoff(options.MaxBackoff)
|
||||
} else {
|
||||
options.Backoff = retry.BackoffDelayerFunc(func(int, error) (time.Duration, error) {
|
||||
return options.MaxBackoff, nil
|
||||
})
|
||||
}
|
||||
|
||||
options.Retryables = []retry.IsErrorRetryable{retry.IsErrorRetryableFunc(func(err error) aws.Ternary {
|
||||
if stderrors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
return aws.TrueTernary
|
||||
}
|
||||
return aws.FalseTernary
|
||||
})}
|
||||
})
|
||||
}
|
||||
|
||||
func (h *handler) createBucketHandlerACL(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
|
|
|
@ -111,6 +111,9 @@ type (
|
|||
authorizedControlAPIKeys [][]byte
|
||||
policyDenyByDefault bool
|
||||
sourceIPHeader string
|
||||
retryMaxAttempts int
|
||||
retryMaxBackoff time.Duration
|
||||
retryStrategy handler.RetryStrategy
|
||||
}
|
||||
|
||||
maxClientsConfig struct {
|
||||
|
@ -238,6 +241,9 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger, key *keys.PrivateK
|
|||
s.setAuthorizedControlAPIKeys(append(fetchAuthorizedKeys(log, v), key.PublicKey()))
|
||||
s.setPolicyDenyByDefault(v.GetBool(cfgPolicyDenyByDefault))
|
||||
s.setSourceIPHeader(v.GetString(cfgSourceIPHeader))
|
||||
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
|
||||
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
|
||||
s.setRetryStrategy(fetchRetryStrategy(v))
|
||||
}
|
||||
|
||||
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
|
||||
|
@ -451,6 +457,42 @@ func (s *appSettings) SourceIPHeader() string {
|
|||
return s.sourceIPHeader
|
||||
}
|
||||
|
||||
func (s *appSettings) setRetryMaxAttempts(maxAttempts int) {
|
||||
s.mu.Lock()
|
||||
s.retryMaxAttempts = maxAttempts
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) RetryMaxAttempts() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.retryMaxAttempts
|
||||
}
|
||||
|
||||
func (s *appSettings) setRetryMaxBackoff(maxBackoff time.Duration) {
|
||||
s.mu.Lock()
|
||||
s.retryMaxBackoff = maxBackoff
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) RetryMaxBackoff() time.Duration {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.retryMaxBackoff
|
||||
}
|
||||
|
||||
func (s *appSettings) setRetryStrategy(strategy handler.RetryStrategy) {
|
||||
s.mu.Lock()
|
||||
s.retryStrategy = strategy
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.retryStrategy
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
a.initHandler()
|
||||
|
|
|
@ -61,6 +61,10 @@ const (
|
|||
defaultNamespace = ""
|
||||
|
||||
defaultReconnectInterval = time.Minute
|
||||
|
||||
defaultRetryMaxAttempts = 4
|
||||
defaultRetryMaxBackoff = 30 * time.Second
|
||||
defaultRetryStrategy = handler.RetryStrategyExponential
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -178,6 +182,11 @@ const ( // Settings.
|
|||
cfgWebWriteTimeout = "web.write_timeout"
|
||||
cfgWebIdleTimeout = "web.idle_timeout"
|
||||
|
||||
// Retry.
|
||||
cfgRetryMaxAttempts = "retry.max_attempts"
|
||||
cfgRetryMaxBackoff = "retry.max_backoff"
|
||||
cfgRetryStrategy = "retry.strategy"
|
||||
|
||||
// Namespaces.
|
||||
cfgNamespacesConfig = "namespaces.config"
|
||||
|
||||
|
@ -325,6 +334,33 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|||
return int64(softMemoryLimit)
|
||||
}
|
||||
|
||||
func fetchRetryMaxAttempts(cfg *viper.Viper) int {
|
||||
val := cfg.GetInt(cfgRetryMaxAttempts)
|
||||
if val <= 0 {
|
||||
val = defaultRetryMaxAttempts
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchRetryMaxBackoff(cfg *viper.Viper) time.Duration {
|
||||
val := cfg.GetDuration(cfgRetryMaxBackoff)
|
||||
if val <= 0 {
|
||||
val = defaultRetryMaxBackoff
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchRetryStrategy(cfg *viper.Viper) handler.RetryStrategy {
|
||||
val := handler.RetryStrategy(cfg.GetString(cfgRetryStrategy))
|
||||
if val != handler.RetryStrategyExponential && val != handler.RetryStrategyConstant {
|
||||
val = defaultRetryStrategy
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy {
|
||||
var policy netmap.PlacementPolicy
|
||||
|
||||
|
@ -759,6 +795,10 @@ func newSettings() *viper.Viper {
|
|||
// resolve
|
||||
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||
|
||||
// retry
|
||||
v.SetDefault(cfgRetryMaxAttempts, defaultRetryMaxAttempts)
|
||||
v.SetDefault(cfgRetryMaxBackoff, defaultRetryMaxBackoff)
|
||||
|
||||
// Bind flags
|
||||
if err := bindFlags(v, flags); err != nil {
|
||||
panic(fmt.Errorf("bind flags: %w", err))
|
||||
|
|
|
@ -223,3 +223,12 @@ S3_GW_NAMESPACES_CONFIG=namespaces.json
|
|||
|
||||
# Custom header to retrieve Source IP
|
||||
S3_GW_SOURCE_IP_HEADER=Source-Ip
|
||||
|
||||
# Retry strategy configuration.
|
||||
# Max amount of request attempts. Currently only for updating bucket settings request.
|
||||
S3_GW_RETRY_MAX_ATTEMPTS=4
|
||||
# Max delay before next attempt.
|
||||
S3_GW_RETRY_MAX_BACKOFF=30s
|
||||
# Backoff strategy. `exponential` and `constant` are allowed.
|
||||
S3_GW_RETRY_STRATEGY=exponential
|
||||
|
||||
|
|
|
@ -262,3 +262,12 @@ namespaces:
|
|||
|
||||
# Custom header to retrieve Source IP
|
||||
source_ip_header: "Source-Ip"
|
||||
|
||||
# Retry strategy configuration.
|
||||
retry:
|
||||
# Max amount of request attempts. Currently only for updating bucket settings request.
|
||||
max_attempts: 4
|
||||
# Max delay before next attempt.
|
||||
max_backoff: 30s
|
||||
# Backoff strategy. `exponential` and `constant` are allowed.
|
||||
strategy: exponential
|
||||
|
|
|
@ -193,6 +193,7 @@ There are some custom types used for brevity:
|
|||
| `policy` | [Policy contract configuration](#policy-section) |
|
||||
| `proxy` | [Proxy contract configuration](#proxy-section) |
|
||||
| `namespaces` | [Namespaces configuration](#namespaces-section) |
|
||||
| `retry` | [Retry configuration](#retry-section) |
|
||||
|
||||
### General section
|
||||
|
||||
|
@ -741,3 +742,21 @@ To override config values for default namespaces use namespace names that are pr
|
|||
}
|
||||
}
|
||||
```
|
||||
|
||||
# `retry` section
|
||||
|
||||
Retry strategy configuration.
|
||||
|
||||
```yaml
|
||||
retry:
|
||||
max_attempts: 4
|
||||
max_backoff: 30s
|
||||
strategy: exponential
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------|------------|---------------|---------------|--------------------------------------------------------------------------------------|
|
||||
| `max_attemps` | `int` | yes | `4` | Max amount of request attempts. Currently only for updating bucket settings request. |
|
||||
| `max_backoff` | `duration` | yes | `30s` | Max delay before next attempt. |
|
||||
| `strategy` | `string` | yes | `exponential` | Backoff strategy. `exponential` and `constant` are allowed. |
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -10,6 +10,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240527065402-303a81cdc6db
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/aws/aws-sdk-go v1.44.6
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1
|
||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
Can we keep single Can we keep single `aws-sdk-go` dependency across all the code? I don't mind to ditch v1
dkirillov
commented
I would do this when branch https://git.frostfs.info/dkirillov/frostfs-s3-gw/commits/branch/feature/339-sigv4a_support be merged to master I would do this when branch https://git.frostfs.info/dkirillov/frostfs-s3-gw/commits/branch/feature/339-sigv4a_support be merged to master
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/go-chi/chi/v5 v5.0.8
|
||||
github.com/google/uuid v1.3.1
|
||||
|
@ -42,6 +43,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/aws/smithy-go v1.13.5 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
|
5
go.sum
5
go.sum
|
@ -64,6 +64,10 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8
|
|||
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
|
||||
github.com/aws/aws-sdk-go v1.44.6 h1:Y+uHxmZfhRTLX2X3khkdxCoTZAyGEX21aOUHe1U6geg=
|
||||
github.com/aws/aws-sdk-go v1.44.6/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
|
||||
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
|
||||
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5MS5JVb4c=
|
||||
|
@ -168,6 +172,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
|
|
52
pkg/retryer/retryer.go
Normal file
52
pkg/retryer/retryer.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package retryer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
)
|
||||
|
||||
func MakeWithRetry(ctx context.Context, fn func() error, retryer aws.RetryerV2) (err error) {
|
||||
var attemptNum int
|
||||
|
||||
maxAttempts := retryer.MaxAttempts()
|
||||
|
||||
for {
|
||||
attemptNum++
|
||||
|
||||
err = fn()
|
||||
|
||||
if !retryer.IsErrorRetryable(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if attemptNum >= maxAttempts {
|
||||
return fmt.Errorf("max retry attempts exausted, max %d: %w", maxAttempts, err)
|
||||
}
|
||||
|
||||
retryDelay, reqErr := retryer.RetryDelay(attemptNum, err)
|
||||
if reqErr != nil {
|
||||
return reqErr
|
||||
}
|
||||
|
||||
if reqErr = sleepWithContext(ctx, retryDelay); reqErr != nil {
|
||||
return reqErr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sleepWithContext(ctx context.Context, dur time.Duration) error {
|
||||
t := time.NewTimer(dur)
|
||||
defer t.Stop()
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
59
pkg/retryer/retryer_test.go
Normal file
59
pkg/retryer/retryer_test.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package retryer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRetryer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
retryer := retry.NewStandard(func(options *retry.StandardOptions) {
|
||||
options.MaxAttempts = 3
|
||||
options.Backoff = retry.BackoffDelayerFunc(func(int, error) (time.Duration, error) { return 0, nil })
|
||||
options.Retryables = []retry.IsErrorRetryable{retry.IsErrorRetryableFunc(func(err error) aws.Ternary {
|
||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
return aws.TrueTernary
|
||||
}
|
||||
return aws.FalseTernary
|
||||
})}
|
||||
})
|
||||
|
||||
t.Run("no retryable", func(t *testing.T) {
|
||||
count := 0
|
||||
err := MakeWithRetry(ctx, func() error {
|
||||
count++
|
||||
if count == 1 {
|
||||
return tree.ErrNodeNotFound
|
||||
}
|
||||
return tree.ErrNodeAccessDenied
|
||||
}, retryer)
|
||||
require.ErrorIs(t, err, tree.ErrNodeNotFound)
|
||||
})
|
||||
|
||||
t.Run("retry", func(t *testing.T) {
|
||||
count := 0
|
||||
err := MakeWithRetry(ctx, func() error {
|
||||
count++
|
||||
if count < 3 {
|
||||
return tree.ErrNodeAccessDenied
|
||||
}
|
||||
return nil
|
||||
}, retryer)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("retry exhausted", func(t *testing.T) {
|
||||
err := MakeWithRetry(ctx, func() error {
|
||||
return tree.ErrNodeAccessDenied
|
||||
}, retryer)
|
||||
require.ErrorIs(t, err, tree.ErrNodeAccessDenied)
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue
What you think about grouping those retry variables in single
retryCfg
object`?Good idea. But then probably we need do the same for other parameters (placement policy related etc). Do we want such changes?
I didn't notice placement policy related parameters at first. Then it's okay to keep it for a while.