[#305] Support checking if accessbox was removed

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-02-06 16:44:49 +03:00
parent 5121c73d3f
commit 924e87face
13 changed files with 256 additions and 44 deletions

View file

@ -30,6 +30,7 @@ This document outlines major changes between releases.
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
- Return `ETag` value in quotes (#219)
- Use tombstone when delete multipart upload (#275)
- Support new parameter `cache.accessbox.removing_check_interval` (#XX)
### Removed
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)

View file

@ -14,14 +14,12 @@ import (
"time"
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// authorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter.
@ -84,9 +82,9 @@ var ContentSHA256HeaderStandardValue = map[string]struct{}{
}
// New creates an instance of AuthCenter.
func New(frostFS tokens.FrostFS, key *keys.PrivateKey, prefixes []string, config *cache.Config) *Center {
func New(creds tokens.Credentials, prefixes []string) *Center {
return &Center{
cli: tokens.New(frostFS, key, config),
cli: creds,
reg: NewRegexpMatcher(authorizationFieldRegexp),
postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
allowedAccessKeyIDPrefixes: prefixes,

View file

@ -24,6 +24,11 @@ type (
Lifetime time.Duration
Logger *zap.Logger
}
AccessBoxCacheValue struct {
Box *accessbox.Box
PutTime time.Time
}
)
const (
@ -42,21 +47,21 @@ func DefaultAccessBoxConfig(logger *zap.Logger) *Config {
}
}
// NewAccessBoxCache creates an object of BucketCache.
// NewAccessBoxCache creates an object of AccessBoxCache.
func NewAccessBoxCache(config *Config) *AccessBoxCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build()
return &AccessBoxCache{cache: gc, logger: config.Logger}
}
// Get returns a cached object.
func (o *AccessBoxCache) Get(address oid.Address) *accessbox.Box {
// Get returns a cached accessbox.
func (o *AccessBoxCache) Get(address oid.Address) *AccessBoxCacheValue {
entry, err := o.cache.Get(address)
if err != nil {
return nil
}
result, ok := entry.(*accessbox.Box)
result, ok := entry.(*AccessBoxCacheValue)
if !ok {
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
@ -66,7 +71,16 @@ func (o *AccessBoxCache) Get(address oid.Address) *accessbox.Box {
return result
}
// Put stores an object to cache.
// Put stores an accessbox to cache.
func (o *AccessBoxCache) Put(address oid.Address, box *accessbox.Box) error {
return o.cache.Set(address, box)
val := &AccessBoxCacheValue{
Box: box,
PutTime: time.Now(),
}
return o.cache.Set(address, val)
}
// Delete removes an accessbox from cache.
func (o *AccessBoxCache) Delete(address oid.Address) {
o.cache.Remove(address)
}

View file

@ -22,7 +22,7 @@ func TestAccessBoxCacheType(t *testing.T) {
err := cache.Put(addr, box)
require.NoError(t, err)
val := cache.Get(addr)
require.Equal(t, box, val)
require.Equal(t, box, val.Box)
require.Equal(t, 0, observedLog.Len())
err = cache.cache.Set(addr, "tmp")

View file

@ -279,7 +279,13 @@ func (a *Agent) IssueSecret(ctx context.Context, w io.Writer, options *IssueSecr
a.log.Info(logs.StoreBearerTokenIntoFrostFS,
zap.Stringer("owner_tkn", idOwner))
creds := tokens.New(a.frostFS, secrets.EphemeralKey, cache.DefaultAccessBoxConfig(a.log))
cfg := tokens.Config{
FrostFS: a.frostFS,
Key: secrets.EphemeralKey,
CacheConfig: cache.DefaultAccessBoxConfig(a.log),
}
creds := tokens.New(cfg)
prm := tokens.CredentialsParam{
OwnerID: idOwner,
@ -330,7 +336,13 @@ func (a *Agent) IssueSecret(ctx context.Context, w io.Writer, options *IssueSecr
// UpdateSecret updates an auth token (change list of gates that can use credential), puts new cred version to the FrostFS network and writes to io.Writer a result.
func (a *Agent) UpdateSecret(ctx context.Context, w io.Writer, options *UpdateSecretOptions) error {
creds := tokens.New(a.frostFS, options.GatePrivateKey, cache.DefaultAccessBoxConfig(a.log))
cfg := tokens.Config{
FrostFS: a.frostFS,
Key: options.GatePrivateKey,
CacheConfig: cache.DefaultAccessBoxConfig(a.log),
}
creds := tokens.New(cfg)
box, err := creds.GetBox(ctx, options.Address)
if err != nil {
@ -406,7 +418,13 @@ func getLifetimeFromGateData(gateData *accessbox.GateData) lifetimeOptions {
// ObtainSecret receives an existing secret access key from FrostFS and
// writes to io.Writer the secret access key.
func (a *Agent) ObtainSecret(ctx context.Context, w io.Writer, options *ObtainSecretOptions) error {
bearerCreds := tokens.New(a.frostFS, options.GatePrivateKey, cache.DefaultAccessBoxConfig(a.log))
cfg := tokens.Config{
FrostFS: a.frostFS,
Key: options.GatePrivateKey,
CacheConfig: cache.DefaultAccessBoxConfig(a.log),
}
bearerCreds := tokens.New(cfg)
var addr oid.Address
if err := addr.DecodeString(options.SecretAddress); err != nil {

View file

@ -27,6 +27,7 @@ import (
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy"
@ -119,8 +120,15 @@ type (
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
objPool, treePool, key := getPools(ctx, log.logger, v)
cfg := tokens.Config{
FrostFS: frostfs.NewAuthmateFrostFS(objPool, key),
Key: key,
CacheConfig: getAccessBoxCacheConfig(v, log.logger),
RemovingCheckAfterDurations: fetchRemovingCheckInterval(v, log.logger),
}
// prepare auth center
ctr := auth.New(frostfs.NewAuthmateFrostFS(objPool, key), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger))
ctr := auth.New(tokens.New(cfg), v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes))
app := &App{
ctr: ctr,

View file

@ -52,6 +52,8 @@ const (
defaultReadHeaderTimeout = 30 * time.Second
defaultIdleTimeout = 30 * time.Second
defaultAccessBoxCacheRemovingCheckInterval = 5 * time.Minute
defaultNamespaceHeader = "X-Frostfs-Namespace"
defaultConstraintName = "default"
@ -113,6 +115,8 @@ const ( // Settings.
cfgMorphPolicyCacheLifetime = "cache.morph_policy.lifetime"
cfgMorphPolicyCacheSize = "cache.morph_policy.size"
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
// NATS.
cfgEnableNATS = "nats.enabled"
cfgNATSEndpoint = "nats.endpoint"
@ -368,6 +372,24 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
return defaultValue
}
func fetchRemovingCheckInterval(v *viper.Viper, l *zap.Logger) time.Duration {
if !v.IsSet(cfgAccessBoxCacheRemovingCheckInterval) {
return defaultAccessBoxCacheRemovingCheckInterval
}
duration := v.GetDuration(cfgAccessBoxCacheRemovingCheckInterval)
if duration >= 0 {
return duration
}
l.Error(logs.InvalidAccessBoxCacheRemovingCheckInterval,
zap.String("parameter", cfgAccessBoxCacheRemovingCheckInterval),
zap.Duration("value in config", duration),
zap.Duration("default", defaultAccessBoxCacheRemovingCheckInterval))
return defaultAccessBoxCacheRemovingCheckInterval
}
func fetchDefaultMaxAge(cfg *viper.Viper, l *zap.Logger) int {
defaultMaxAge := handler.DefaultMaxAge
@ -675,6 +697,8 @@ func newSettings() *viper.Viper {
// set defaults:
v.SetDefault(cfgAccessBoxCacheRemovingCheckInterval, defaultAccessBoxCacheRemovingCheckInterval)
// logger:
v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerDestination, "stdout")

View file

@ -95,6 +95,7 @@ S3_GW_CACHE_NAMES_SIZE=10000
S3_GW_CACHE_SYSTEM_LIFETIME=5m
S3_GW_CACHE_SYSTEM_SIZE=100000
# Cache which stores access box with tokens by its address
S3_GW_CACHE_ACCESSBOX_REMOVING_CHECK_INTERVAL=5m
S3_GW_CACHE_ACCESSBOX_LIFETIME=10m
S3_GW_CACHE_ACCESSBOX_SIZE=100
# Cache which stores owner to cache operation mapping

View file

@ -118,8 +118,9 @@ cache:
size: 1000
# Cache which stores access box with tokens by its address
accessbox:
lifetime: 5m
size: 10
removing_check_interval: 5m
lifetime: 10m
size: 100
# Cache which stores owner to cache operation mapping
accesscontrol:
lifetime: 1m

View file

@ -9,11 +9,14 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
type (
@ -36,6 +39,15 @@ type (
key *keys.PrivateKey
frostFS FrostFS
cache *cache.AccessBoxCache
removingCheckDuration time.Duration
log *zap.Logger
}
Config struct {
FrostFS FrostFS
Key *keys.PrivateKey
CacheConfig *cache.Config
RemovingCheckAfterDurations time.Duration
}
)
@ -90,17 +102,40 @@ var (
ErrEmptyBearerToken = errors.New("Bearer token could not be empty")
)
var _ = New
var _ Credentials = (*cred)(nil)
// New creates a new Credentials instance using the given cli and key.
func New(frostFS FrostFS, key *keys.PrivateKey, config *cache.Config) Credentials {
return &cred{frostFS: frostFS, key: key, cache: cache.NewAccessBoxCache(config)}
func New(cfg Config) Credentials {
return &cred{
frostFS: cfg.FrostFS,
key: cfg.Key,
cache: cache.NewAccessBoxCache(cfg.CacheConfig),
removingCheckDuration: cfg.RemovingCheckAfterDurations,
log: cfg.CacheConfig.Logger,
}
}
func (c *cred) GetBox(ctx context.Context, addr oid.Address) (*accessbox.Box, error) {
cachedBox := c.cache.Get(addr)
if cachedBox != nil {
return cachedBox, nil
cachedBoxValue := c.cache.Get(addr)
if cachedBoxValue != nil {
if time.Since(cachedBoxValue.PutTime) > c.removingCheckDuration {
if box, err := c.getAccessBox(ctx, addr); err != nil {
if client.IsErrObjectAlreadyRemoved(err) {
c.cache.Delete(addr)
return nil, fmt.Errorf("get access box: %w", err)
}
} else {
cachedBox, err := box.GetBox(c.key)
if err != nil {
c.cache.Delete(addr)
return nil, fmt.Errorf("get gate box: %w", err)
}
// we need this to reset PutTime
// to don't check for removing each time after removingCheckDuration interval
c.putBoxToCache(addr, cachedBox)
}
}
return cachedBoxValue.Box, nil
}
box, err := c.getAccessBox(ctx, addr)
@ -108,18 +143,22 @@ func (c *cred) GetBox(ctx context.Context, addr oid.Address) (*accessbox.Box, er
return nil, fmt.Errorf("get access box: %w", err)
}
cachedBox, err = box.GetBox(c.key)
cachedBox, err := box.GetBox(c.key)
if err != nil {
return nil, fmt.Errorf("get box: %w", err)
return nil, fmt.Errorf("get gate box: %w", err)
}
if err = c.cache.Put(addr, cachedBox); err != nil {
return nil, fmt.Errorf("put box into cache: %w", err)
}
c.putBoxToCache(addr, cachedBox)
return cachedBox, nil
}
func (c *cred) putBoxToCache(addr oid.Address, box *accessbox.Box) {
if err := c.cache.Put(addr, box); err != nil {
c.log.Warn(logs.CouldntPutAccessBoxIntoCache, zap.String("address", addr.EncodeToString()))
}
}
func (c *cred) getAccessBox(ctx context.Context, addr oid.Address) (*accessbox.AccessBox, error) {
data, err := c.frostFS.GetCredsPayload(ctx, addr)
if err != nil {

View file

@ -0,0 +1,91 @@
package tokens
import (
"context"
"encoding/hex"
"errors"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
type frostfsMock struct {
objects map[oid.Address][]byte
errors map[oid.Address]error
}
func (f *frostfsMock) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) {
panic("implement me for test")
}
func (f *frostfsMock) GetCredsPayload(_ context.Context, address oid.Address) ([]byte, error) {
if err := f.errors[address]; err != nil {
return nil, err
}
data, ok := f.objects[address]
if !ok {
return nil, errors.New("not found")
}
return data, nil
}
func TestRemovingAccessBox(t *testing.T) {
ctx := context.Background()
key, err := keys.NewPrivateKey()
require.NoError(t, err)
gateData := []*accessbox.GateData{{
BearerToken: &bearer.Token{},
GateKey: key.PublicKey(),
}}
secretKey := "713d0a0b9efc7d22923e17b0402a6a89b4273bc711c8bacb2da1b643d0006aeb"
sk, err := hex.DecodeString(secretKey)
require.NoError(t, err)
accessBox, _, err := accessbox.PackTokens(gateData, sk)
require.NoError(t, err)
data, err := accessBox.Marshal()
require.NoError(t, err)
addr := oidtest.Address()
frostfs := &frostfsMock{
objects: map[oid.Address][]byte{addr: data},
errors: map[oid.Address]error{},
}
cfg := Config{
FrostFS: frostfs,
Key: key,
CacheConfig: &cache.Config{
Size: 10,
Lifetime: 24 * time.Hour,
Logger: zaptest.NewLogger(t),
},
RemovingCheckAfterDurations: 0, // means check always
}
creds := New(cfg)
_, err = creds.GetBox(ctx, addr)
require.NoError(t, err)
frostfs.errors[addr] = errors.New("network error")
_, err = creds.GetBox(ctx, addr)
require.NoError(t, err)
frostfs.errors[addr] = &apistatus.ObjectAlreadyRemoved{}
_, err = creds.GetBox(ctx, addr)
require.Error(t, err)
}

View file

@ -409,8 +409,9 @@ cache:
lifetime: 2m
size: 1000
accessbox:
lifetime: 5m
size: 10
removing_check_interval: 5m
lifetime: 10m
size: 100
accesscontrol:
lifetime: 1m
size: 100000
@ -420,14 +421,14 @@ cache:
```
| Parameter | Type | Default value | Description |
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. |
| `accessbox` | [Cache config](#cache-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
@ -443,6 +444,20 @@ size: 1000
| `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. |
| `size` | `int` | depends on cache | LRU cache size. |
#### `accessbox` subsection
```yaml
lifetime: 10m
size: 100
```
| Parameter | Type | Default value | Description |
|---------------------------|------------|---------------|-------------------------------------------------------|
| `removing_check_interval` | `duration` | `5m' | Time after which creds should be checked for removal. |
| `lifetime` | `duration` | '10m' | Lifetime of entries in cache. |
| `size` | `int` | '100 | LRU cache size. |
### `nats` section
This is an advanced section, use with caution.

View file

@ -139,4 +139,6 @@ const (
ParseTreeNode = "parse tree node"
FailedToGetRealObjectSize = "failed to get real object size"
CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree"
CouldntPutAccessBoxIntoCache = "couldn't put accessbox into cache"
InvalidAccessBoxCacheRemovingCheckInterval = "invalid accessbox check removing interval, using default value"
)