[#112] Add cache to ListObjects and layer

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2021-07-28 16:28:05 +03:00
parent c24fe5cc21
commit 0ceea95e11
3 changed files with 127 additions and 7 deletions

View file

@ -24,6 +24,7 @@ type (
layer struct {
pool pool.Pool
log *zap.Logger
cache ObjectsListV2Cache
}
// Params stores basic API parameters.
@ -130,6 +131,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool) Client {
return &layer{
pool: conns,
log: log,
cache: newListObjectsCache(),
}
}

View file

@ -13,6 +13,7 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"go.uber.org/zap"
)
@ -258,6 +259,8 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
result ListObjectsInfoV2
allObjects []*ObjectInfo
bkt *BucketInfo
cacheKey string
box *accessbox.Box
)
if p.MaxKeys == 0 {
@ -268,9 +271,18 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
return nil, err
}
if box, err = GetBoxData(ctx); err != nil {
return nil, err
}
cacheKey = createKey(box.Gate.AccessKey, bkt.CID)
if p.ContinuationToken != "" {
// find cache with continuation token
} else {
allObjects = n.cache.Get(p.ContinuationToken, cacheKey)
allObjects = trimStartAfter(p.StartAfter, allObjects)
}
if allObjects == nil {
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
@ -280,13 +292,20 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
if err != nil {
return nil, err
}
if p.ContinuationToken != "" {
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
}
}
if len(allObjects) > p.MaxKeys {
result.IsTruncated = true
restObjects := allObjects[p.MaxKeys:]
n.cache.Put(cacheKey, restObjects)
result.NextContinuationToken = restObjects[0].id.String()
allObjects = allObjects[:p.MaxKeys]
// add creating of cache here
}
for _, ov := range allObjects {
@ -343,3 +362,23 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
return objects, nil
}
func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter {
for i := range objects {
if objects[i].Name > startAfter {
return objects[i:]
}
}
}
return objects
}
func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
for i, obj := range objects {
if obj.ID().String() == id {
return objects[i:]
}
}
return objects
}

79
api/layer/object_cache.go Normal file
View file

@ -0,0 +1,79 @@
package layer
import (
"sync"
"time"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
)
/*
This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken.
The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with
creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after
defaultCacheLifetime value.
ContinuationToken in our gateway is an objectID in NeoFS.
We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect
to other gateways and they should be able to get a list of objects.
When we receive the token from the user we just try to find the cache and then we return the list of objects which
starts from this token (i.e. objectID).
*/
// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct.
type (
ObjectsListV2Cache interface {
Get(token string, key string) []*ObjectInfo
Put(key string, objects []*ObjectInfo)
}
)
var (
defaultCacheLifetime = time.Second * 60
)
type (
listObjectsCache struct {
caches map[string]cache
mtx sync.RWMutex
}
cache struct {
list []*ObjectInfo
}
)
func newListObjectsCache() *listObjectsCache {
return &listObjectsCache{
caches: make(map[string]cache),
}
}
func (l *listObjectsCache) Get(token, key string) []*ObjectInfo {
l.mtx.RLock()
defer l.mtx.RUnlock()
if val, ok := l.caches[key]; ok {
return trimAfterObjectID(token, val.list)
}
return nil
}
func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) {
var c cache
l.mtx.Lock()
defer l.mtx.Unlock()
c.list = objects
l.caches[key] = c
time.AfterFunc(defaultCacheLifetime, func() {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
})
}
func createKey(accessKey string, cid *cid.ID) string {
return accessKey + cid.String()
}