[#179] api: Add cache for ListObjectsV1/V2

Refactored cache for ListObjects:
made cache common for all versions,
simplified: remove dependendence on token/startafter
add mitable cachelifetime.

Refactored listobjects

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2021-07-29 11:14:14 +03:00
parent a0fb14d91e
commit 8a69c7cca0
3 changed files with 136 additions and 113 deletions

View file

@ -24,7 +24,7 @@ type (
layer struct { layer struct {
pool pool.Pool pool pool.Pool
log *zap.Logger log *zap.Logger
cache ObjectsListV2Cache cache ObjectsListCache
} }
// Params stores basic API parameters. // Params stores basic API parameters.
@ -131,7 +131,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool) Client {
return &layer{ return &layer{
pool: conns, pool: conns,
log: log, log: log,
cache: newListObjectsCache(), cache: newListObjectsCache(defaultCacheLifetime),
} }
} }

View file

@ -13,7 +13,6 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -56,10 +55,9 @@ type (
} }
allObjectParams struct { allObjectParams struct {
Bucket *BucketInfo Bucket *BucketInfo
Delimiter string Delimiter string
Prefix string Prefix string
StartAfter string
} }
) )
@ -209,45 +207,34 @@ func (n *layer) objectDelete(ctx context.Context, address *object.Address) error
// ListObjectsV1 returns objects in a bucket for requests of Version 1. // ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var ( var (
err error err error
result ListObjectsInfoV1 result ListObjectsInfoV1
bkt *BucketInfo allObjects []*ObjectInfo
) )
if p.MaxKeys == 0 { if p.MaxKeys == 0 {
return &result, nil return &result, nil
} }
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
return nil, err return nil, err
} }
allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{ if len(allObjects) == 0 {
Bucket: bkt, return &result, nil
Prefix: p.Prefix, }
Delimiter: p.Delimiter,
StartAfter: p.Marker, if p.Marker != "" {
}) allObjects = trimAfterObjectName(p.Marker, allObjects)
if err != nil {
return nil, err
} }
if len(allObjects) > p.MaxKeys { if len(allObjects) > p.MaxKeys {
result.IsTruncated = true result.IsTruncated = true
nextObject := allObjects[p.MaxKeys-1]
result.NextMarker = nextObject.Name
allObjects = allObjects[:p.MaxKeys] allObjects = allObjects[:p.MaxKeys]
result.NextMarker = allObjects[len(allObjects)-1].Name
} }
for _, ov := range allObjects { result.Prefixes, result.Objects = triageObjects(allObjects)
if ov.isDir {
result.Prefixes = append(result.Prefixes, ov.Name)
} else {
result.Objects = append(result.Objects, ov)
}
}
return &result, nil return &result, nil
} }
@ -258,67 +245,40 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
err error err error
result ListObjectsInfoV2 result ListObjectsInfoV2
allObjects []*ObjectInfo allObjects []*ObjectInfo
bkt *BucketInfo
cacheKey string
box *accessbox.Box
) )
if p.MaxKeys == 0 { if p.MaxKeys == 0 {
return &result, nil return &result, nil
} }
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
return nil, err return nil, err
} }
if box, err = GetBoxData(ctx); err != nil { if len(allObjects) == 0 {
return nil, err return &result, nil
} }
cacheKey = createKey(box.Gate.AccessKey, bkt.CID)
if p.ContinuationToken != "" { if p.ContinuationToken != "" {
allObjects = n.cache.Get(p.ContinuationToken, cacheKey) allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
allObjects = trimStartAfter(p.StartAfter, allObjects)
} }
if allObjects == nil { if p.StartAfter != "" {
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{ allObjects = trimAfterObjectName(p.StartAfter, allObjects)
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
StartAfter: p.StartAfter,
})
if err != nil {
return nil, err
}
if p.ContinuationToken != "" {
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
}
} }
if len(allObjects) > p.MaxKeys { if len(allObjects) > p.MaxKeys {
result.IsTruncated = true result.IsTruncated = true
restObjects := allObjects[p.MaxKeys:]
n.cache.Put(cacheKey, restObjects)
result.NextContinuationToken = restObjects[0].id.String()
allObjects = allObjects[:p.MaxKeys] allObjects = allObjects[:p.MaxKeys]
result.NextContinuationToken = allObjects[len(allObjects)-1].id.String()
} }
for _, ov := range allObjects { result.Prefixes, result.Objects = triageObjects(allObjects)
if ov.isDir {
result.Prefixes = append(result.Prefixes, ov.Name)
} else {
result.Objects = append(result.Objects, ov)
}
}
return &result, nil return &result, nil
} }
func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
var ( var (
err error err error
ids []*object.ID ids []*object.ID
@ -346,9 +306,6 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
if _, ok := uniqNames[oi.Name]; ok { if _, ok := uniqNames[oi.Name]; ok {
continue continue
} }
if len(p.StartAfter) > 0 && oi.Name <= p.StartAfter {
continue
}
uniqNames[oi.Name] = oi.isDir uniqNames[oi.Name] = oi.isDir
@ -363,22 +320,75 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
return objects, nil return objects, nil
} }
func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo { func trimAfterObjectName(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter { if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter {
for i := range objects { return nil
if objects[i].Name > startAfter {
return objects[i:]
}
}
} }
return objects for i := range objects {
} if objects[i].Name > startAfter {
func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
for i, obj := range objects {
if obj.ID().String() == id {
return objects[i:] return objects[i:]
} }
} }
return objects
return nil
}
func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
if len(objects) != 0 && objects[len(objects)-1].id.String() == id {
return []*ObjectInfo{}
}
for i, obj := range objects {
if obj.ID().String() == id {
return objects[i+1:]
}
}
return nil
}
func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*ObjectInfo) {
for _, ov := range allObjects {
if ov.isDir {
prefixes = append(prefixes, ov.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) {
var (
err error
bkt *BucketInfo
cacheKey cacheOptions
allObjects []*ObjectInfo
)
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return nil, err
}
if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil {
return nil, err
}
allObjects = n.cache.Get(cacheKey)
if allObjects == nil {
allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
})
if err != nil {
return nil, err
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.cache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
}
return allObjects, nil
} }

View file

@ -1,6 +1,7 @@
package layer package layer
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -8,72 +9,84 @@ import (
) )
/* /*
This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken. This is an implementation of a cache for ListObjectsV2/V1 which we can return to users when we receive a ListObjects
request.
The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with The cache is a map which has a key: cacheOptions struct and a value: list of objects. After putting a record we
creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after start a timer (via time.AfterFunc) that removes the record after defaultCacheLifetime value.
defaultCacheLifetime value.
ContinuationToken in our gateway is an objectID in NeoFS. When we get a request from the user we just try to find the suitable and non-expired cache and then we return
the list of objects. Otherwise we send the request to NeoFS.
We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect
to other gateways and they should be able to get a list of objects.
When we receive the token from the user we just try to find the cache and then we return the list of objects which
starts from this token (i.e. objectID).
*/ */
// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct. // ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct.
type ( type (
ObjectsListV2Cache interface { ObjectsListCache interface {
Get(token string, key string) []*ObjectInfo Get(key cacheOptions) []*ObjectInfo
Put(key string, objects []*ObjectInfo) Put(key cacheOptions, objects []*ObjectInfo)
} }
) )
var ( const defaultCacheLifetime = time.Second * 60
defaultCacheLifetime = time.Second * 60
)
type ( type (
listObjectsCache struct { listObjectsCache struct {
caches map[string]cache cacheLifetime time.Duration
mtx sync.RWMutex caches map[cacheOptions]cache
mtx sync.RWMutex
} }
cache struct { cache struct {
list []*ObjectInfo list []*ObjectInfo
} }
cacheOptions struct {
key string
delimiter string
prefix string
}
) )
func newListObjectsCache() *listObjectsCache { func newListObjectsCache(lifetime time.Duration) *listObjectsCache {
return &listObjectsCache{ return &listObjectsCache{
caches: make(map[string]cache), caches: make(map[cacheOptions]cache),
cacheLifetime: lifetime,
} }
} }
func (l *listObjectsCache) Get(token, key string) []*ObjectInfo { func (l *listObjectsCache) Get(key cacheOptions) []*ObjectInfo {
l.mtx.RLock() l.mtx.RLock()
defer l.mtx.RUnlock() defer l.mtx.RUnlock()
if val, ok := l.caches[key]; ok { if val, ok := l.caches[key]; ok {
return trimAfterObjectID(token, val.list) return val.list
} }
return nil return nil
} }
func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) { func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) {
if len(objects) == 0 {
return
}
var c cache var c cache
l.mtx.Lock() l.mtx.Lock()
defer l.mtx.Unlock() defer l.mtx.Unlock()
c.list = objects c.list = objects
l.caches[key] = c l.caches[key] = c
time.AfterFunc(defaultCacheLifetime, func() { time.AfterFunc(l.cacheLifetime, func() {
l.mtx.Lock() l.mtx.Lock()
delete(l.caches, key) delete(l.caches, key)
l.mtx.Unlock() l.mtx.Unlock()
}) })
} }
func createKey(accessKey string, cid *cid.ID) string { func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) {
return accessKey + cid.String() box, err := GetBoxData(ctx)
if err != nil {
return cacheOptions{}, err
}
p := cacheOptions{
key: box.Gate.AccessKey + cid.String(),
delimiter: delimiter,
prefix: prefix,
}
return p, nil
} }