From 8a69c7cca08b1044dadc397718941256aed82c81 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Thu, 29 Jul 2021 11:14:14 +0300 Subject: [PATCH] [#179] api: Add cache for ListObjectsV1/V2 Refactored cache for ListObjects: made cache common for all versions, simplified: remove dependendence on token/startafter add mitable cachelifetime. Refactored listobjects Signed-off-by: Angira Kekteeva --- api/layer/layer.go | 4 +- api/layer/object.go | 174 ++++++++++++++++++++------------------ api/layer/object_cache.go | 71 +++++++++------- 3 files changed, 136 insertions(+), 113 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index c42e488e..fdea8fb0 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -24,7 +24,7 @@ type ( layer struct { pool pool.Pool log *zap.Logger - cache ObjectsListV2Cache + cache ObjectsListCache } // Params stores basic API parameters. @@ -131,7 +131,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool) Client { return &layer{ pool: conns, log: log, - cache: newListObjectsCache(), + cache: newListObjectsCache(defaultCacheLifetime), } } diff --git a/api/layer/object.go b/api/layer/object.go index dd88e549..2ce97043 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -13,7 +13,6 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-s3-gw/api" - "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "go.uber.org/zap" ) @@ -56,10 +55,9 @@ type ( } allObjectParams struct { - Bucket *BucketInfo - Delimiter string - Prefix string - StartAfter string + Bucket *BucketInfo + Delimiter string + Prefix string } ) @@ -209,45 +207,34 @@ func (n *layer) objectDelete(ctx context.Context, address *object.Address) error // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var ( - err error - result ListObjectsInfoV1 - bkt *BucketInfo + err error + result ListObjectsInfoV1 + allObjects []*ObjectInfo ) if p.MaxKeys == 0 { return &result, nil } - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil { return nil, err } - allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{ - Bucket: bkt, - Prefix: p.Prefix, - Delimiter: p.Delimiter, - StartAfter: p.Marker, - }) - if err != nil { - return nil, err + if len(allObjects) == 0 { + return &result, nil + } + + if p.Marker != "" { + allObjects = trimAfterObjectName(p.Marker, allObjects) } if len(allObjects) > p.MaxKeys { result.IsTruncated = true - - nextObject := allObjects[p.MaxKeys-1] - result.NextMarker = nextObject.Name - allObjects = allObjects[:p.MaxKeys] + result.NextMarker = allObjects[len(allObjects)-1].Name } - for _, ov := range allObjects { - if ov.isDir { - result.Prefixes = append(result.Prefixes, ov.Name) - } else { - result.Objects = append(result.Objects, ov) - } - } + result.Prefixes, result.Objects = triageObjects(allObjects) return &result, nil } @@ -258,67 +245,40 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis err error result ListObjectsInfoV2 allObjects []*ObjectInfo - bkt *BucketInfo - cacheKey string - box *accessbox.Box ) if p.MaxKeys == 0 { return &result, nil } - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil { return nil, err } - if box, err = GetBoxData(ctx); err != nil { - return nil, err + if len(allObjects) == 0 { + return &result, nil } - cacheKey = createKey(box.Gate.AccessKey, bkt.CID) - if p.ContinuationToken != "" { - allObjects = n.cache.Get(p.ContinuationToken, cacheKey) - allObjects = trimStartAfter(p.StartAfter, allObjects) + allObjects = trimAfterObjectID(p.ContinuationToken, allObjects) } - if allObjects == nil { - allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{ - Bucket: bkt, - Prefix: p.Prefix, - Delimiter: p.Delimiter, - StartAfter: p.StartAfter, - }) - if err != nil { - return nil, err - } - - if p.ContinuationToken != "" { - allObjects = trimAfterObjectID(p.ContinuationToken, allObjects) - } + if p.StartAfter != "" { + allObjects = trimAfterObjectName(p.StartAfter, allObjects) } if len(allObjects) > p.MaxKeys { result.IsTruncated = true - - restObjects := allObjects[p.MaxKeys:] - n.cache.Put(cacheKey, restObjects) - result.NextContinuationToken = restObjects[0].id.String() - allObjects = allObjects[:p.MaxKeys] + result.NextContinuationToken = allObjects[len(allObjects)-1].id.String() } - for _, ov := range allObjects { - if ov.isDir { - result.Prefixes = append(result.Prefixes, ov.Name) - } else { - result.Objects = append(result.Objects, ov) - } - } + result.Prefixes, result.Objects = triageObjects(allObjects) + return &result, nil } -func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { +func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { var ( err error ids []*object.ID @@ -346,9 +306,6 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([] if _, ok := uniqNames[oi.Name]; ok { continue } - if len(p.StartAfter) > 0 && oi.Name <= p.StartAfter { - continue - } uniqNames[oi.Name] = oi.isDir @@ -363,22 +320,75 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([] return objects, nil } -func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo { - if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter { - for i := range objects { - if objects[i].Name > startAfter { - return objects[i:] - } - } +func trimAfterObjectName(startAfter string, objects []*ObjectInfo) []*ObjectInfo { + if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter { + return nil } - return objects -} - -func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo { - for i, obj := range objects { - if obj.ID().String() == id { + for i := range objects { + if objects[i].Name > startAfter { return objects[i:] } } - return objects + + return nil +} + +func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo { + if len(objects) != 0 && objects[len(objects)-1].id.String() == id { + return []*ObjectInfo{} + } + for i, obj := range objects { + if obj.ID().String() == id { + return objects[i+1:] + } + } + + return nil +} + +func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*ObjectInfo) { + for _, ov := range allObjects { + if ov.isDir { + prefixes = append(prefixes, ov.Name) + } else { + objects = append(objects, ov) + } + } + + return +} + +func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) { + var ( + err error + bkt *BucketInfo + cacheKey cacheOptions + allObjects []*ObjectInfo + ) + + if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + return nil, err + } + + if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil { + return nil, err + } + + allObjects = n.cache.Get(cacheKey) + + if allObjects == nil { + allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{ + Bucket: bkt, + Prefix: p.Prefix, + Delimiter: p.Delimiter, + }) + if err != nil { + return nil, err + } + + // putting to cache a copy of allObjects because allObjects can be modified further + n.cache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) + } + + return allObjects, nil } diff --git a/api/layer/object_cache.go b/api/layer/object_cache.go index 3d3fd925..ce151576 100644 --- a/api/layer/object_cache.go +++ b/api/layer/object_cache.go @@ -1,6 +1,7 @@ package layer import ( + "context" "sync" "time" @@ -8,72 +9,84 @@ import ( ) /* - This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken. + This is an implementation of a cache for ListObjectsV2/V1 which we can return to users when we receive a ListObjects + request. - The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with - creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after - defaultCacheLifetime value. + The cache is a map which has a key: cacheOptions struct and a value: list of objects. After putting a record we + start a timer (via time.AfterFunc) that removes the record after defaultCacheLifetime value. - ContinuationToken in our gateway is an objectID in NeoFS. - - We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect - to other gateways and they should be able to get a list of objects. - When we receive the token from the user we just try to find the cache and then we return the list of objects which - starts from this token (i.e. objectID). + When we get a request from the user we just try to find the suitable and non-expired cache and then we return + the list of objects. Otherwise we send the request to NeoFS. */ -// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct. +// ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct. type ( - ObjectsListV2Cache interface { - Get(token string, key string) []*ObjectInfo - Put(key string, objects []*ObjectInfo) + ObjectsListCache interface { + Get(key cacheOptions) []*ObjectInfo + Put(key cacheOptions, objects []*ObjectInfo) } ) -var ( - defaultCacheLifetime = time.Second * 60 -) +const defaultCacheLifetime = time.Second * 60 type ( listObjectsCache struct { - caches map[string]cache - mtx sync.RWMutex + cacheLifetime time.Duration + caches map[cacheOptions]cache + mtx sync.RWMutex } cache struct { list []*ObjectInfo } + cacheOptions struct { + key string + delimiter string + prefix string + } ) -func newListObjectsCache() *listObjectsCache { +func newListObjectsCache(lifetime time.Duration) *listObjectsCache { return &listObjectsCache{ - caches: make(map[string]cache), + caches: make(map[cacheOptions]cache), + cacheLifetime: lifetime, } } -func (l *listObjectsCache) Get(token, key string) []*ObjectInfo { +func (l *listObjectsCache) Get(key cacheOptions) []*ObjectInfo { l.mtx.RLock() defer l.mtx.RUnlock() if val, ok := l.caches[key]; ok { - return trimAfterObjectID(token, val.list) + return val.list } - return nil } -func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) { +func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) { + if len(objects) == 0 { + return + } var c cache - l.mtx.Lock() defer l.mtx.Unlock() c.list = objects l.caches[key] = c - time.AfterFunc(defaultCacheLifetime, func() { + time.AfterFunc(l.cacheLifetime, func() { l.mtx.Lock() delete(l.caches, key) l.mtx.Unlock() }) } -func createKey(accessKey string, cid *cid.ID) string { - return accessKey + cid.String() +func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) { + box, err := GetBoxData(ctx) + if err != nil { + return cacheOptions{}, err + } + p := cacheOptions{ + key: box.Gate.AccessKey + cid.String(), + delimiter: delimiter, + prefix: prefix, + } + + return p, nil }