forked from TrueCloudLab/frostfs-s3-gw
[#165] Add list session cache
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
29ac91dfd5
commit
6e8960b2ab
13 changed files with 196 additions and 55 deletions
99
api/cache/listsession.go
vendored
Normal file
99
api/cache/listsession.go
vendored
Normal file
|
@ -0,0 +1,99 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"github.com/bluele/gcache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// ListSessionCache contains cache for list session (during pagination).
|
||||
ListSessionCache struct {
|
||||
cache gcache.Cache
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// ListSessionKey is a key to find a ListSessionCache's entry.
|
||||
ListSessionKey struct {
|
||||
cid cid.ID
|
||||
prefix string
|
||||
token string
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
|
||||
DefaultListSessionCacheLifetime = time.Second * 60
|
||||
// DefaultListSessionCacheSize is a default size of cache of ListObjects.
|
||||
DefaultListSessionCacheSize = 100
|
||||
)
|
||||
|
||||
// DefaultListSessionConfig returns new default cache expiration values.
|
||||
func DefaultListSessionConfig(logger *zap.Logger) *Config {
|
||||
return &Config{
|
||||
Size: DefaultListSessionCacheSize,
|
||||
Lifetime: DefaultListSessionCacheLifetime,
|
||||
Logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (k *ListSessionKey) String() string {
|
||||
return k.cid.EncodeToString() + k.prefix + k.token
|
||||
}
|
||||
|
||||
// NewListSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
|
||||
func NewListSessionCache(config *Config) *ListSessionCache {
|
||||
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(key interface{}, val interface{}) {
|
||||
session, ok := val.(*data.ListSession)
|
||||
if !ok {
|
||||
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
|
||||
zap.String("expected", fmt.Sprintf("%T", session)))
|
||||
}
|
||||
|
||||
session.Cancel()
|
||||
}).Build()
|
||||
return &ListSessionCache{cache: gc, logger: config.Logger}
|
||||
}
|
||||
|
||||
// GetListSession returns a list of ObjectInfo.
|
||||
func (l *ListSessionCache) GetListSession(key ListSessionKey) *data.ListSession {
|
||||
entry, err := l.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(*data.ListSession)
|
||||
if !ok {
|
||||
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// PutListSession puts a list of object versions to cache.
|
||||
func (l *ListSessionCache) PutListSession(key ListSessionKey, session *data.ListSession) error {
|
||||
return l.cache.Set(key, session)
|
||||
}
|
||||
|
||||
// DeleteListSession removes key from cache.
|
||||
func (l *ListSessionCache) DeleteListSession(key ListSessionKey) {
|
||||
l.cache.Remove(key)
|
||||
}
|
||||
|
||||
// CreateListSessionCacheKey returns ListSessionKey with the given CID, prefix and token.
|
||||
func CreateListSessionCacheKey(cnr cid.ID, prefix, token string) ListSessionKey {
|
||||
p := ListSessionKey{
|
||||
cid: cnr,
|
||||
prefix: prefix,
|
||||
token: token,
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
18
api/data/listsession.go
Normal file
18
api/data/listsession.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type VersionsStream interface {
|
||||
Next(ctx context.Context) (*NodeVersion, error)
|
||||
}
|
||||
|
||||
// todo consider thread safe
|
||||
type ListSession struct {
|
||||
Next *ObjectInfo
|
||||
Stream VersionsStream
|
||||
NamesMap map[string]struct{}
|
||||
Context context.Context
|
||||
Cancel context.CancelFunc
|
||||
}
|
|
@ -1,8 +1,6 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
|
@ -12,17 +10,10 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type TestCacheValue struct {
|
||||
Next *data.ObjectInfo
|
||||
Stream LatestVersionsByPrefixStream
|
||||
NamesMap map[string]struct{}
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type Cache struct {
|
||||
testCache map[string]TestCacheValue
|
||||
logger *zap.Logger
|
||||
listsCache *cache.ObjectsListCache
|
||||
sessionListCache *cache.ListSessionCache
|
||||
objCache *cache.ObjectsCache
|
||||
namesCache *cache.ObjectsNameCache
|
||||
bucketCache *cache.BucketCache
|
||||
|
@ -35,6 +26,7 @@ type CachesConfig struct {
|
|||
Logger *zap.Logger
|
||||
Objects *cache.Config
|
||||
ObjectsList *cache.Config
|
||||
SessionList *cache.Config
|
||||
Names *cache.Config
|
||||
Buckets *cache.Config
|
||||
System *cache.Config
|
||||
|
@ -47,6 +39,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
|
|||
Logger: logger,
|
||||
Objects: cache.DefaultObjectsConfig(logger),
|
||||
ObjectsList: cache.DefaultObjectsListConfig(logger),
|
||||
SessionList: cache.DefaultListSessionConfig(logger),
|
||||
Names: cache.DefaultObjectsNameConfig(logger),
|
||||
Buckets: cache.DefaultBucketConfig(logger),
|
||||
System: cache.DefaultSystemConfig(logger),
|
||||
|
@ -56,9 +49,9 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
|
|||
|
||||
func NewCache(cfg *CachesConfig) *Cache {
|
||||
return &Cache{
|
||||
testCache: map[string]TestCacheValue{},
|
||||
logger: cfg.Logger,
|
||||
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
|
||||
sessionListCache: cache.NewListSessionCache(cfg.ObjectsList),
|
||||
objCache: cache.New(cfg.Objects),
|
||||
namesCache: cache.NewObjectsNameCache(cfg.Names),
|
||||
bucketCache: cache.NewBucketCache(cfg.Buckets),
|
||||
|
@ -155,6 +148,29 @@ func (c *Cache) PutList(owner user.ID, key cache.ObjectsListKey, list []*data.No
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.ListSession {
|
||||
if !c.accessCache.Get(owner, key.String()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.sessionListCache.GetListSession(key)
|
||||
}
|
||||
|
||||
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
|
||||
if err := c.sessionListCache.PutListSession(key, session); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := c.accessCache.Put(owner, key.String()); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
|
||||
c.sessionListCache.DeleteListSession(key)
|
||||
c.accessCache.Delete(owner, key.String())
|
||||
}
|
||||
|
||||
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
|
||||
if !c.accessCache.Get(owner, key) {
|
||||
return nil
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
|
@ -652,41 +651,39 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
|||
return nil, nil, nil
|
||||
}
|
||||
|
||||
testKey := p.Prefix + p.Delimiter + p.ContinuationToken
|
||||
nodeVersionsStreamValue, ok := n.cache.testCache[testKey]
|
||||
|
||||
if ok {
|
||||
delete(n.cache.testCache, testKey)
|
||||
owner := n.BearerOwner(ctx)
|
||||
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.Delimiter)
|
||||
session := n.cache.GetListSession(owner, cacheKey)
|
||||
if session != nil {
|
||||
// after reading next object from stream in session
|
||||
// the current cache value already doesn't match with next token in cache key
|
||||
n.cache.DeleteListSession(owner, cacheKey)
|
||||
} else {
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
<-time.After(10 * time.Second)
|
||||
cancel2()
|
||||
}()
|
||||
session = &data.ListSession{NamesMap: make(map[string]struct{})}
|
||||
session.Context, session.Cancel = context.WithCancel(context.Background())
|
||||
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil {
|
||||
ctx2 = middleware.SetBoxData(ctx2, bd)
|
||||
session.Context = middleware.SetBoxData(session.Context, bd)
|
||||
}
|
||||
|
||||
nodeVersionsStreamValue.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(ctx2, p.Bucket, p.Prefix)
|
||||
session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
nodeVersionsStreamValue.NamesMap = map[string]struct{}{}
|
||||
}
|
||||
|
||||
poolCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
generator, errorCh := nodesGeneratorStream(poolCtx, p, nodeVersionsStreamValue)
|
||||
generator, errorCh := nodesGeneratorStream(poolCtx, p, session)
|
||||
objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
||||
}
|
||||
|
||||
objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1)
|
||||
if nodeVersionsStreamValue.Next != nil {
|
||||
objects = append(objects, nodeVersionsStreamValue.Next)
|
||||
if session.Next != nil {
|
||||
objects = append(objects, session.Next)
|
||||
}
|
||||
|
||||
for obj := range objOutCh {
|
||||
|
@ -694,9 +691,7 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
|||
}
|
||||
|
||||
if err = <-errorCh; err != nil {
|
||||
fmt.Println(len(objects))
|
||||
fmt.Println(objects[len(objects)-1].Name)
|
||||
return nil, nil, fmt.Errorf("failed to get object from tree: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
|
||||
}
|
||||
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
|
@ -709,8 +704,8 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
|||
}
|
||||
|
||||
if next != nil {
|
||||
nodeVersionsStreamValue.Next = next
|
||||
n.cache.testCache[p.Prefix+p.Delimiter+next.VersionID()] = nodeVersionsStreamValue
|
||||
session.Next = next
|
||||
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.VersionID()), session)
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -772,7 +767,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions
|
|||
return nodeCh
|
||||
}
|
||||
|
||||
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream TestCacheValue) (<-chan *data.NodeVersion, <-chan error) {
|
||||
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
|
||||
nodeCh := make(chan *data.NodeVersion)
|
||||
errCh := make(chan error, 1)
|
||||
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
|
||||
|
|
|
@ -212,7 +212,7 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (LatestVersionsByPrefixStream, error) {
|
||||
func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) {
|
||||
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
||||
if !ok {
|
||||
return nil, ErrNodeNotFound
|
||||
|
|
|
@ -8,10 +8,6 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type LatestVersionsByPrefixStream interface {
|
||||
Next(ctx context.Context) (*data.NodeVersion, error)
|
||||
}
|
||||
|
||||
// TreeService provide interface to interact with tree service using s3 data models.
|
||||
type TreeService interface {
|
||||
// PutSettingsNode update or create new settings node in tree service.
|
||||
|
@ -59,7 +55,7 @@ type TreeService interface {
|
|||
GetVersions(ctx context.Context, bktInfo *data.BucketInfo, objectName string) ([]*data.NodeVersion, error)
|
||||
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||
GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
|
||||
GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (LatestVersionsByPrefixStream, error)
|
||||
GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error)
|
||||
GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
|
||||
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
|
||||
|
|
|
@ -906,6 +906,9 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
|||
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
||||
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
||||
|
||||
cacheCfg.SessionList.Lifetime = fetchCacheLifetime(v, l, cfgSessionListCacheLifetime, cacheCfg.SessionList.Lifetime)
|
||||
cacheCfg.SessionList.Size = fetchCacheSize(v, l, cfgSessionListCacheSize, cacheCfg.SessionList.Size)
|
||||
|
||||
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
||||
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
||||
|
||||
|
|
|
@ -98,6 +98,8 @@ const ( // Settings.
|
|||
cfgObjectsCacheSize = "cache.objects.size"
|
||||
cfgListObjectsCacheLifetime = "cache.list.lifetime"
|
||||
cfgListObjectsCacheSize = "cache.list.size"
|
||||
cfgSessionListCacheLifetime = "cache.list_session.lifetime"
|
||||
cfgSessionListCacheSize = "cache.list_session.size"
|
||||
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
||||
cfgBucketsCacheSize = "cache.buckets.size"
|
||||
cfgNamesCacheLifetime = "cache.names.lifetime"
|
||||
|
|
|
@ -82,6 +82,9 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
|
|||
# Cache which keeps lists of objects in buckets
|
||||
S3_GW_CACHE_LIST_LIFETIME=1m
|
||||
S3_GW_CACHE_LIST_SIZE=100000
|
||||
# Cache which keeps listing session
|
||||
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
|
||||
S3_GW_CACHE_LIST_SESSION_SIZE=100
|
||||
# Cache which contains mapping of bucket name to bucket info
|
||||
S3_GW_CACHE_BUCKETS_LIFETIME=1m
|
||||
S3_GW_CACHE_BUCKETS_SIZE=1000
|
||||
|
|
|
@ -100,6 +100,10 @@ cache:
|
|||
list:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
# Cache which keeps listing sessions
|
||||
list_session:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
# Cache which contains mapping of nice name to object addresses
|
||||
names:
|
||||
lifetime: 1m
|
||||
|
|
|
@ -396,6 +396,9 @@ cache:
|
|||
list:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
list_session:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
names:
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
|
@ -420,6 +423,7 @@ cache:
|
|||
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
|
||||
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
|
||||
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
|
||||
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
|
||||
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. |
|
||||
|
|
|
@ -95,6 +95,7 @@ const (
|
|||
CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go
|
||||
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
|
||||
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
|
||||
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
|
||||
|
|
|
@ -740,7 +740,7 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node
|
|||
return newNodeVersionFromTreeNode(filepath, treeNode), nil
|
||||
}
|
||||
|
||||
func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (layer.LatestVersionsByPrefixStream, error) {
|
||||
func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) {
|
||||
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
|
|
Loading…
Reference in a new issue