diff --git a/api/cache/listsession.go b/api/cache/listsession.go new file mode 100644 index 0000000..bbe4a80 --- /dev/null +++ b/api/cache/listsession.go @@ -0,0 +1,99 @@ +package cache + +import ( + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "github.com/bluele/gcache" + "go.uber.org/zap" +) + +type ( + // ListSessionCache contains cache for list session (during pagination). + ListSessionCache struct { + cache gcache.Cache + logger *zap.Logger + } + + // ListSessionKey is a key to find a ListSessionCache's entry. + ListSessionKey struct { + cid cid.ID + prefix string + token string + } +) + +const ( + // DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects. + DefaultListSessionCacheLifetime = time.Second * 60 + // DefaultListSessionCacheSize is a default size of cache of ListObjects. + DefaultListSessionCacheSize = 100 +) + +// DefaultListSessionConfig returns new default cache expiration values. +func DefaultListSessionConfig(logger *zap.Logger) *Config { + return &Config{ + Size: DefaultListSessionCacheSize, + Lifetime: DefaultListSessionCacheLifetime, + Logger: logger, + } +} + +func (k *ListSessionKey) String() string { + return k.cid.EncodeToString() + k.prefix + k.token +} + +// NewListSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries. +func NewListSessionCache(config *Config) *ListSessionCache { + gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(key interface{}, val interface{}) { + session, ok := val.(*data.ListSession) + if !ok { + config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)), + zap.String("expected", fmt.Sprintf("%T", session))) + } + + session.Cancel() + }).Build() + return &ListSessionCache{cache: gc, logger: config.Logger} +} + +// GetListSession returns a list of ObjectInfo. +func (l *ListSessionCache) GetListSession(key ListSessionKey) *data.ListSession { + entry, err := l.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*data.ListSession) + if !ok { + l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)), + zap.String("expected", fmt.Sprintf("%T", result))) + return nil + } + + return result +} + +// PutListSession puts a list of object versions to cache. +func (l *ListSessionCache) PutListSession(key ListSessionKey, session *data.ListSession) error { + return l.cache.Set(key, session) +} + +// DeleteListSession removes key from cache. +func (l *ListSessionCache) DeleteListSession(key ListSessionKey) { + l.cache.Remove(key) +} + +// CreateListSessionCacheKey returns ListSessionKey with the given CID, prefix and token. +func CreateListSessionCacheKey(cnr cid.ID, prefix, token string) ListSessionKey { + p := ListSessionKey{ + cid: cnr, + prefix: prefix, + token: token, + } + + return p +} diff --git a/api/data/listsession.go b/api/data/listsession.go new file mode 100644 index 0000000..5050739 --- /dev/null +++ b/api/data/listsession.go @@ -0,0 +1,18 @@ +package data + +import ( + "context" +) + +type VersionsStream interface { + Next(ctx context.Context) (*NodeVersion, error) +} + +// todo consider thread safe +type ListSession struct { + Next *ObjectInfo + Stream VersionsStream + NamesMap map[string]struct{} + Context context.Context + Cancel context.CancelFunc +} diff --git a/api/layer/cache.go b/api/layer/cache.go index b4d5fa9..b3c2c56 100644 --- a/api/layer/cache.go +++ b/api/layer/cache.go @@ -1,8 +1,6 @@ package layer import ( - "context" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" @@ -12,22 +10,15 @@ import ( "go.uber.org/zap" ) -type TestCacheValue struct { - Next *data.ObjectInfo - Stream LatestVersionsByPrefixStream - NamesMap map[string]struct{} - Context context.Context -} - type Cache struct { - testCache map[string]TestCacheValue - logger *zap.Logger - listsCache *cache.ObjectsListCache - objCache *cache.ObjectsCache - namesCache *cache.ObjectsNameCache - bucketCache *cache.BucketCache - systemCache *cache.SystemCache - accessCache *cache.AccessControlCache + logger *zap.Logger + listsCache *cache.ObjectsListCache + sessionListCache *cache.ListSessionCache + objCache *cache.ObjectsCache + namesCache *cache.ObjectsNameCache + bucketCache *cache.BucketCache + systemCache *cache.SystemCache + accessCache *cache.AccessControlCache } // CachesConfig contains params for caches. @@ -35,6 +26,7 @@ type CachesConfig struct { Logger *zap.Logger Objects *cache.Config ObjectsList *cache.Config + SessionList *cache.Config Names *cache.Config Buckets *cache.Config System *cache.Config @@ -47,6 +39,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig { Logger: logger, Objects: cache.DefaultObjectsConfig(logger), ObjectsList: cache.DefaultObjectsListConfig(logger), + SessionList: cache.DefaultListSessionConfig(logger), Names: cache.DefaultObjectsNameConfig(logger), Buckets: cache.DefaultBucketConfig(logger), System: cache.DefaultSystemConfig(logger), @@ -56,14 +49,14 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig { func NewCache(cfg *CachesConfig) *Cache { return &Cache{ - testCache: map[string]TestCacheValue{}, - logger: cfg.Logger, - listsCache: cache.NewObjectsListCache(cfg.ObjectsList), - objCache: cache.New(cfg.Objects), - namesCache: cache.NewObjectsNameCache(cfg.Names), - bucketCache: cache.NewBucketCache(cfg.Buckets), - systemCache: cache.NewSystemCache(cfg.System), - accessCache: cache.NewAccessControlCache(cfg.AccessControl), + logger: cfg.Logger, + listsCache: cache.NewObjectsListCache(cfg.ObjectsList), + sessionListCache: cache.NewListSessionCache(cfg.ObjectsList), + objCache: cache.New(cfg.Objects), + namesCache: cache.NewObjectsNameCache(cfg.Names), + bucketCache: cache.NewBucketCache(cfg.Buckets), + systemCache: cache.NewSystemCache(cfg.System), + accessCache: cache.NewAccessControlCache(cfg.AccessControl), } } @@ -155,6 +148,29 @@ func (c *Cache) PutList(owner user.ID, key cache.ObjectsListKey, list []*data.No } } +func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.ListSession { + if !c.accessCache.Get(owner, key.String()) { + return nil + } + + return c.sessionListCache.GetListSession(key) +} + +func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) { + if err := c.sessionListCache.PutListSession(key, session); err != nil { + c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err)) + } + + if err := c.accessCache.Put(owner, key.String()); err != nil { + c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err)) + } +} + +func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) { + c.sessionListCache.DeleteListSession(key) + c.accessCache.Delete(owner, key.String()) +} + func (c *Cache) GetTagging(owner user.ID, key string) map[string]string { if !c.accessCache.Get(owner, key) { return nil diff --git a/api/layer/object.go b/api/layer/object.go index 1fb3451..86c4851 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -17,7 +17,6 @@ import ( "strconv" "strings" "sync" - "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" @@ -652,41 +651,39 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam return nil, nil, nil } - testKey := p.Prefix + p.Delimiter + p.ContinuationToken - nodeVersionsStreamValue, ok := n.cache.testCache[testKey] - - if ok { - delete(n.cache.testCache, testKey) + owner := n.BearerOwner(ctx) + cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.Delimiter) + session := n.cache.GetListSession(owner, cacheKey) + if session != nil { + // after reading next object from stream in session + // the current cache value already doesn't match with next token in cache key + n.cache.DeleteListSession(owner, cacheKey) } else { - ctx2, cancel2 := context.WithCancel(context.Background()) - go func() { - <-time.After(10 * time.Second) - cancel2() - }() + session = &data.ListSession{NamesMap: make(map[string]struct{})} + session.Context, session.Cancel = context.WithCancel(context.Background()) if bd, err := middleware.GetBoxData(ctx); err == nil { - ctx2 = middleware.SetBoxData(ctx2, bd) + session.Context = middleware.SetBoxData(session.Context, bd) } - nodeVersionsStreamValue.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(ctx2, p.Bucket, p.Prefix) + session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix) if err != nil { return nil, nil, err } - nodeVersionsStreamValue.NamesMap = map[string]struct{}{} } poolCtx, cancel := context.WithCancel(ctx) defer cancel() - generator, errorCh := nodesGeneratorStream(poolCtx, p, nodeVersionsStreamValue) + generator, errorCh := nodesGeneratorStream(poolCtx, p, session) objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator) if err != nil { return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1) - if nodeVersionsStreamValue.Next != nil { - objects = append(objects, nodeVersionsStreamValue.Next) + if session.Next != nil { + objects = append(objects, session.Next) } for obj := range objOutCh { @@ -694,9 +691,7 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam } if err = <-errorCh; err != nil { - fmt.Println(len(objects)) - fmt.Println(objects[len(objects)-1].Name) - return nil, nil, fmt.Errorf("failed to get object from tree: %w", err) + return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) } sort.Slice(objects, func(i, j int) bool { @@ -709,8 +704,8 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam } if next != nil { - nodeVersionsStreamValue.Next = next - n.cache.testCache[p.Prefix+p.Delimiter+next.VersionID()] = nodeVersionsStreamValue + session.Next = next + n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.VersionID()), session) } return @@ -772,7 +767,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions return nodeCh } -func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream TestCacheValue) (<-chan *data.NodeVersion, <-chan error) { +func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion) errCh := make(chan error, 1) //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 2c556a5..5c55ebe 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -212,7 +212,7 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo * return result, nil } -func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (LatestVersionsByPrefixStream, error) { +func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { return nil, ErrNodeNotFound diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index f17513e..c21e6c7 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -8,10 +8,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -type LatestVersionsByPrefixStream interface { - Next(ctx context.Context) (*data.NodeVersion, error) -} - // TreeService provide interface to interact with tree service using s3 data models. type TreeService interface { // PutSettingsNode update or create new settings node in tree service. @@ -59,7 +55,7 @@ type TreeService interface { GetVersions(ctx context.Context, bktInfo *data.BucketInfo, objectName string) ([]*data.NodeVersion, error) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) - GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (LatestVersionsByPrefixStream, error) + GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 23b59df..684cdb1 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -906,6 +906,9 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig { cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime) cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size) + cacheCfg.SessionList.Lifetime = fetchCacheLifetime(v, l, cfgSessionListCacheLifetime, cacheCfg.SessionList.Lifetime) + cacheCfg.SessionList.Size = fetchCacheSize(v, l, cfgSessionListCacheSize, cacheCfg.SessionList.Size) + cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime) cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size) diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 3b09ddf..10cacd9 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -98,6 +98,8 @@ const ( // Settings. cfgObjectsCacheSize = "cache.objects.size" cfgListObjectsCacheLifetime = "cache.list.lifetime" cfgListObjectsCacheSize = "cache.list.size" + cfgSessionListCacheLifetime = "cache.list_session.lifetime" + cfgSessionListCacheSize = "cache.list_session.size" cfgBucketsCacheLifetime = "cache.buckets.lifetime" cfgBucketsCacheSize = "cache.buckets.size" cfgNamesCacheLifetime = "cache.names.lifetime" diff --git a/config/config.env b/config/config.env index 037a426..6ce5386 100644 --- a/config/config.env +++ b/config/config.env @@ -82,6 +82,9 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000 # Cache which keeps lists of objects in buckets S3_GW_CACHE_LIST_LIFETIME=1m S3_GW_CACHE_LIST_SIZE=100000 +# Cache which keeps listing session +S3_GW_CACHE_LIST_SESSION_LIFETIME=1m +S3_GW_CACHE_LIST_SESSION_SIZE=100 # Cache which contains mapping of bucket name to bucket info S3_GW_CACHE_BUCKETS_LIFETIME=1m S3_GW_CACHE_BUCKETS_SIZE=1000 diff --git a/config/config.yaml b/config/config.yaml index 06eb962..c1b1ba8 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -100,6 +100,10 @@ cache: list: lifetime: 1m size: 100 + # Cache which keeps listing sessions + list_session: + lifetime: 1m + size: 100 # Cache which contains mapping of nice name to object addresses names: lifetime: 1m diff --git a/docs/configuration.md b/docs/configuration.md index e7d5adf..2ec1da5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -396,6 +396,9 @@ cache: list: lifetime: 1m size: 100 + list_session: + lifetime: 1m + size: 100 names: lifetime: 1m size: 1000 @@ -420,6 +423,7 @@ cache: |-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------| | `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). | | `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. | +| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. | | `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. | | `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. | | `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. | diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c3ca1fa..2e4954c 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -95,6 +95,7 @@ const ( CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go + CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 7c3ce2f..dba9bf6 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -740,7 +740,7 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node return newNodeVersionFromTreeNode(filepath, treeNode), nil } -func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (layer.LatestVersionsByPrefixStream, error) { +func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix) if err != nil { if errors.Is(err, io.EOF) {