[#165] Cancel context in outdated list session
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
4e15452853
commit
88f1acbdfc
7 changed files with 69 additions and 15 deletions
10
api/cache/listsession.go
vendored
10
api/cache/listsession.go
vendored
|
@ -55,7 +55,9 @@ func NewListSessionCache(config *Config) *ListSessionCache {
|
||||||
zap.String("expected", fmt.Sprintf("%T", session)))
|
zap.String("expected", fmt.Sprintf("%T", session)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo session.Cancel()
|
if !session.Acquired.Load() {
|
||||||
|
session.Cancel()
|
||||||
|
}
|
||||||
}).Build()
|
}).Build()
|
||||||
return &ListSessionCache{cache: gc, logger: config.Logger}
|
return &ListSessionCache{cache: gc, logger: config.Logger}
|
||||||
}
|
}
|
||||||
|
@ -79,6 +81,12 @@ func (l *ListSessionCache) GetListSession(key ListSessionKey) *data.ListSession
|
||||||
|
|
||||||
// PutListSession puts a list of object versions to cache.
|
// PutListSession puts a list of object versions to cache.
|
||||||
func (l *ListSessionCache) PutListSession(key ListSessionKey, session *data.ListSession) error {
|
func (l *ListSessionCache) PutListSession(key ListSessionKey, session *data.ListSession) error {
|
||||||
|
s := l.GetListSession(key)
|
||||||
|
if s != nil && s != session {
|
||||||
|
if !s.Acquired.Load() {
|
||||||
|
s.Cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
return l.cache.Set(key, session)
|
return l.cache.Set(key, session)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ type handlerContext struct {
|
||||||
|
|
||||||
layerFeatures *layer.FeatureSettingsMock
|
layerFeatures *layer.FeatureSettingsMock
|
||||||
treeMock *tree.ServiceClientMemory
|
treeMock *tree.ServiceClientMemory
|
||||||
|
cache *layer.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hc *handlerContext) Handler() *handler {
|
func (hc *handlerContext) Handler() *handler {
|
||||||
|
@ -126,14 +127,14 @@ func (c *configMock) ResolveNamespaceAlias(ns string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
return prepareHandlerContextBase(t, false)
|
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
|
func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
|
||||||
return prepareHandlerContextBase(t, true)
|
return prepareHandlerContextBase(t, getMinCacheConfig(zap.NewExample()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
|
func prepareHandlerContextBase(t *testing.T, cacheCfg *layer.CachesConfig) *handlerContext {
|
||||||
key, err := keys.NewPrivateKey()
|
key, err := keys.NewPrivateKey()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -153,15 +154,10 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
|
||||||
|
|
||||||
treeMock := tree.NewTree(memCli, zap.NewExample())
|
treeMock := tree.NewTree(memCli, zap.NewExample())
|
||||||
|
|
||||||
cacheCfg := layer.DefaultCachesConfigs(l)
|
|
||||||
if minCache {
|
|
||||||
cacheCfg = getMinCacheConfig(l)
|
|
||||||
}
|
|
||||||
|
|
||||||
features := &layer.FeatureSettingsMock{}
|
features := &layer.FeatureSettingsMock{}
|
||||||
|
|
||||||
layerCfg := &layer.Config{
|
layerCfg := &layer.Config{
|
||||||
Caches: cacheCfg,
|
Cache: layer.NewCache(cacheCfg),
|
||||||
AnonKey: layer.AnonymousKey{Key: key},
|
AnonKey: layer.AnonymousKey{Key: key},
|
||||||
Resolver: testResolver,
|
Resolver: testResolver,
|
||||||
TreeService: treeMock,
|
TreeService: treeMock,
|
||||||
|
@ -194,6 +190,7 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
|
||||||
|
|
||||||
layerFeatures: features,
|
layerFeatures: features,
|
||||||
treeMock: memCli,
|
treeMock: memCli,
|
||||||
|
cache: layerCfg.Cache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,6 +204,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
Objects: minCacheCfg,
|
Objects: minCacheCfg,
|
||||||
ObjectsList: minCacheCfg,
|
ObjectsList: minCacheCfg,
|
||||||
|
SessionList: minCacheCfg,
|
||||||
Names: minCacheCfg,
|
Names: minCacheCfg,
|
||||||
Buckets: minCacheCfg,
|
Buckets: minCacheCfg,
|
||||||
System: minCacheCfg,
|
System: minCacheCfg,
|
||||||
|
|
|
@ -1,17 +1,22 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParseContinuationToken(t *testing.T) {
|
func TestParseContinuationToken(t *testing.T) {
|
||||||
|
@ -60,6 +65,49 @@ func TestListObjectNullVersions(t *testing.T) {
|
||||||
require.Equal(t, data.UnversionedObjectVersionID, result.Version[1].VersionID)
|
require.Equal(t, data.UnversionedObjectVersionID, result.Version[1].VersionID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListObjectsContextCanceled(t *testing.T) {
|
||||||
|
layerCfg := layer.DefaultCachesConfigs(zaptest.NewLogger(t))
|
||||||
|
layerCfg.SessionList.Lifetime = time.Hour
|
||||||
|
layerCfg.SessionList.Size = 1
|
||||||
|
|
||||||
|
hc := prepareHandlerContextBase(t, layerCfg)
|
||||||
|
|
||||||
|
bktName := "bucket-versioning-enabled"
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
putObject(hc, bktName, "object"+strconv.Itoa(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
result := listObjectsV1(hc, bktName, "", "", "", 2)
|
||||||
|
session := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result.NextMarker))
|
||||||
|
// invoke list again to trigger cache eviction
|
||||||
|
// (use empty prefix to check that context canceled on replace)
|
||||||
|
listObjectsV1(hc, bktName, "", "", "", 2)
|
||||||
|
checkContextCanceled(t, session.Context)
|
||||||
|
|
||||||
|
result2 := listObjectsV2(hc, bktName, "", "", "", "", 2)
|
||||||
|
session2 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result2.NextContinuationToken))
|
||||||
|
// invoke list again to trigger cache eviction
|
||||||
|
// (use non-empty prefix to check that context canceled on cache eviction)
|
||||||
|
listObjectsV2(hc, bktName, "o", "", "", "", 2)
|
||||||
|
checkContextCanceled(t, session2.Context)
|
||||||
|
|
||||||
|
result3 := listObjectsVersions(hc, bktName, "", "", "", "", 2)
|
||||||
|
session3 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result3.NextVersionIDMarker))
|
||||||
|
// invoke list again to trigger cache eviction
|
||||||
|
listObjectsVersions(hc, bktName, "o", "", "", "", 2)
|
||||||
|
checkContextCanceled(t, session3.Context)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkContextCanceled(t *testing.T, ctx context.Context) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
}
|
||||||
|
require.ErrorIs(t, ctx.Err(), context.Canceled)
|
||||||
|
}
|
||||||
|
|
||||||
func TestListObjectsLatestVersions(t *testing.T) {
|
func TestListObjectsLatestVersions(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ func NewCache(cfg *CachesConfig) *Cache {
|
||||||
return &Cache{
|
return &Cache{
|
||||||
logger: cfg.Logger,
|
logger: cfg.Logger,
|
||||||
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
|
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
|
||||||
sessionListCache: cache.NewListSessionCache(cfg.ObjectsList),
|
sessionListCache: cache.NewListSessionCache(cfg.SessionList),
|
||||||
objCache: cache.New(cfg.Objects),
|
objCache: cache.New(cfg.Objects),
|
||||||
namesCache: cache.NewObjectsNameCache(cfg.Names),
|
namesCache: cache.NewObjectsNameCache(cfg.Names),
|
||||||
bucketCache: cache.NewBucketCache(cfg.Buckets),
|
bucketCache: cache.NewBucketCache(cfg.Buckets),
|
||||||
|
|
|
@ -69,7 +69,7 @@ type (
|
||||||
Config struct {
|
Config struct {
|
||||||
GateOwner user.ID
|
GateOwner user.ID
|
||||||
ChainAddress string
|
ChainAddress string
|
||||||
Caches *CachesConfig
|
Cache *Cache
|
||||||
AnonKey AnonymousKey
|
AnonKey AnonymousKey
|
||||||
Resolver BucketResolver
|
Resolver BucketResolver
|
||||||
TreeService TreeService
|
TreeService TreeService
|
||||||
|
@ -323,7 +323,7 @@ func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
||||||
gateOwner: config.GateOwner,
|
gateOwner: config.GateOwner,
|
||||||
anonKey: config.AnonKey,
|
anonKey: config.AnonKey,
|
||||||
resolver: config.Resolver,
|
resolver: config.Resolver,
|
||||||
cache: NewCache(config.Caches),
|
cache: config.Cache,
|
||||||
treeService: config.TreeService,
|
treeService: config.TreeService,
|
||||||
features: config.Features,
|
features: config.Features,
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,7 +168,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
||||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
layerCfg := &Config{
|
layerCfg := &Config{
|
||||||
Caches: config,
|
Cache: NewCache(config),
|
||||||
AnonKey: AnonymousKey{Key: key},
|
AnonKey: AnonymousKey{Key: key},
|
||||||
TreeService: NewTreeService(),
|
TreeService: NewTreeService(),
|
||||||
Features: &FeatureSettingsMock{},
|
Features: &FeatureSettingsMock{},
|
||||||
|
|
|
@ -165,7 +165,7 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
|
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
layerCfg := &layer.Config{
|
layerCfg := &layer.Config{
|
||||||
Caches: getCacheOptions(a.cfg, a.log),
|
Cache: layer.NewCache(getCacheOptions(a.cfg, a.log)),
|
||||||
AnonKey: layer.AnonymousKey{
|
AnonKey: layer.AnonymousKey{
|
||||||
Key: randomKey,
|
Key: randomKey,
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in a new issue